You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/05/03 23:03:24 UTC

hbase git commit: HBASE-16466 Snapshots support in VerifyReplication tool

Repository: hbase
Updated Branches:
  refs/heads/master 78f6799f4 -> 2de6b051f


HBASE-16466 Snapshots support in VerifyReplication tool

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2de6b051
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2de6b051
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2de6b051

Branch: refs/heads/master
Commit: 2de6b051f67b6a55eda8d4e247328fda24484adb
Parents: 78f6799
Author: Sukumar Maddineni <su...@gmail.com>
Authored: Wed May 3 13:35:26 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed May 3 14:22:18 2017 -0700

----------------------------------------------------------------------
 .../mapreduce/TableSnapshotInputFormat.java     |   5 +
 .../replication/VerifyReplication.java          | 199 +++++++++++++++----
 .../replication/TestReplicationSmallTests.java  | 123 +++++++++++-
 3 files changed, 282 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2de6b051/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index 15d403f..a5f699d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -120,6 +120,11 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
     public void readFields(DataInput in) throws IOException {
       delegate.readFields(in);
     }
+
+    public HRegionInfo getRegionInfo() {
+      return delegate.getRegionInfo();
+    }
+
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/hbase/blob/2de6b051/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 3f8317b..3d32edd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -25,6 +25,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
@@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterList;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
@@ -43,6 +46,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -50,14 +54,19 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * This map-only job compares the data from a local table with a remote one.
  * Every cell is compared and must have exactly the same keys (even timestamp)
@@ -75,18 +84,31 @@ public class VerifyReplication extends Configured implements Tool {
 
   public final static String NAME = "verifyrep";
   private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
-  static long startTime = 0;
-  static long endTime = Long.MAX_VALUE;
-  static int batch = -1;
-  static int versions = -1;
-  static String tableName = null;
-  static String families = null;
-  static String delimiter = "";
-  static String peerId = null;
-  static String rowPrefixes = null;
-  static int sleepMsBeforeReCompare = 0;
-  static boolean verbose = false;
-  static boolean includeDeletedCells = false;
+  long startTime = 0;
+  long endTime = Long.MAX_VALUE;
+  int batch = -1;
+  int versions = -1;
+  String tableName = null;
+  String families = null;
+  String delimiter = "";
+  String peerId = null;
+  String rowPrefixes = null;
+  int sleepMsBeforeReCompare = 0;
+  boolean verbose = false;
+  boolean includeDeletedCells = false;
+  //Source table snapshot name
+  String sourceSnapshotName = null;
+  //Temp location in source cluster to restore source snapshot
+  String sourceSnapshotTmpDir = null;
+  //Peer table snapshot name
+  String peerSnapshotName = null;
+  //Temp location in peer cluster to restore peer snapshot
+  String peerSnapshotTmpDir = null;
+  //Peer cluster Hadoop FS address
+  String peerFSAddress = null;
+  //Peer cluster HBase root dir location
+  String peerHBaseRootAddress = null;
+
 
   private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
 
@@ -159,7 +181,7 @@ public class VerifyReplication extends Configured implements Tool {
         sourceConnection = ConnectionFactory.createConnection(conf);
         sourceTable = sourceConnection.getTable(tableName);
 
-        final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
+        final InputSplit tableSplit = context.getInputSplit();
 
         String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
         Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
@@ -168,8 +190,33 @@ public class VerifyReplication extends Configured implements Tool {
         replicatedConnection = ConnectionFactory.createConnection(peerConf);
         replicatedTable = replicatedConnection.getTable(tableName);
         scan.setStartRow(value.getRow());
-        scan.setStopRow(tableSplit.getEndRow());
-        replicatedScanner = replicatedTable.getScanner(scan);
+
+        byte[] endRow = null;
+        if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
+          endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
+              .getEndKey();
+        } else {
+          endRow = ((TableSplit) tableSplit).getEndRow();
+        }
+
+        scan.setStopRow(endRow);
+
+        String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
+        if (peerSnapshotName != null) {
+          String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
+          String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
+          String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
+          FileSystem.setDefaultUri(peerConf, peerFSAddress);
+          FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
+          LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
+              + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
+              + " peerFSAddress:" + peerFSAddress);
+
+          replicatedScanner = new TableSnapshotScanner(peerConf,
+              new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
+        } else {
+          replicatedScanner = replicatedTable.getScanner(scan);
+        }
         currentCompareRowInPeerTable = replicatedScanner.next();
       }
       while (true) {
@@ -282,7 +329,7 @@ public class VerifyReplication extends Configured implements Tool {
   }
 
   private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
-      final Configuration conf) throws IOException {
+      final Configuration conf, String peerId) throws IOException {
     ZooKeeperWatcher localZKW = null;
     ReplicationPeerZKImpl peer = null;
     try {
@@ -322,7 +369,7 @@ public class VerifyReplication extends Configured implements Tool {
    * @return The newly created job.
    * @throws java.io.IOException When setting up the job fails.
    */
-  public static Job createSubmittableJob(Configuration conf, String[] args)
+  public Job createSubmittableJob(Configuration conf, String[] args)
   throws IOException {
     if (!doCommandLine(args)) {
       return null;
@@ -343,7 +390,7 @@ public class VerifyReplication extends Configured implements Tool {
       conf.set(NAME+".rowPrefixes", rowPrefixes);
     }
 
-    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
+    Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
     ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
     String peerQuorumAddress = peerConfig.getClusterKey();
     LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
@@ -355,6 +402,17 @@ public class VerifyReplication extends Configured implements Tool {
     conf.setInt(NAME + ".versions", versions);
     LOG.info("Number of version: " + versions);
 
+    //Set Snapshot specific parameters
+    if (peerSnapshotName != null) {
+      conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
+      conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
+      conf.set(NAME + ".peerFSAddress", peerFSAddress);
+      conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
+
+      // This is to create HDFS delegation token for peer cluster in case of secured
+      conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress);
+    }
+
     Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
     job.setJarByClass(VerifyReplication.class);
 
@@ -378,9 +436,15 @@ public class VerifyReplication extends Configured implements Tool {
 
     setRowPrefixFilter(scan, rowPrefixes);
 
-    TableMapReduceUtil.initTableMapperJob(tableName, scan,
-        Verifier.class, null, null, job);
-
+    if (sourceSnapshotName != null) {
+      Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
+      LOG.info(
+        "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
+      TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
+        null, job, true, snapshotTempPath);
+    } else {
+      TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
+    }
     Configuration peerClusterConf = peerConfigPair.getSecond();
     // Obtain the auth token from peer cluster
     TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
@@ -413,15 +477,12 @@ public class VerifyReplication extends Configured implements Tool {
     scan.setStopRow(stopRow);
   }
 
-  private static boolean doCommandLine(final String[] args) {
+  @VisibleForTesting
+  public boolean doCommandLine(final String[] args) {
     if (args.length < 2) {
       printUsage(null);
       return false;
     }
-    //in case we've been run before, restore all parameters to their initial states
-    //Otherwise, if our previous run included a parameter not in args this time,
-    //we might hold on to the old value.
-    restoreDefaults();
     try {
       for (int i = 0; i < args.length; i++) {
         String cmd = args[i];
@@ -487,10 +548,47 @@ public class VerifyReplication extends Configured implements Tool {
         if (cmd.startsWith(verboseKey)) {
           verbose = true;
           continue;
-        }        
+        }
+
+        final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
+        if (cmd.startsWith(sourceSnapshotNameArgKey)) {
+          sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
+          continue;
+        }
+
+        final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
+        if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
+          sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
+          continue;
+        }
+
+        final String peerSnapshotNameArgKey = "--peerSnapshotName=";
+        if (cmd.startsWith(peerSnapshotNameArgKey)) {
+          peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
+          continue;
+        }
+
+        final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
+        if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
+          peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
+          continue;
+        }
+
+        final String peerFSAddressArgKey = "--peerFSAddress=";
+        if (cmd.startsWith(peerFSAddressArgKey)) {
+          peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
+          continue;
+        }
+
+        final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
+        if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
+          peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
+          continue;
+        }
 
         if (cmd.startsWith("--")) {
           printUsage("Invalid argument '" + cmd + "'");
+          return false;
         }
 
         if (i == args.length-2) {
@@ -501,6 +599,32 @@ public class VerifyReplication extends Configured implements Tool {
           tableName = cmd;
         }
       }
+
+      if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
+          || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
+        printUsage("Source snapshot name and snapshot temp location should be provided"
+            + " to use snapshots in source cluster");
+        return false;
+      }
+
+      if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
+          || peerHBaseRootAddress != null) {
+        if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
+            || peerHBaseRootAddress == null) {
+          printUsage(
+            "Peer snapshot name, peer snapshot temp location, Peer HBase root address and  "
+                + "peer FSAddress should be provided to use snapshots in peer cluster");
+          return false;
+        }
+      }
+
+      // This is to avoid making recompare calls to source/peer tables when snapshots are used
+      if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
+        printUsage(
+          "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
+        return false;
+      }
+
     } catch (Exception e) {
       e.printStackTrace();
       printUsage("Can't start because " + e.getMessage());
@@ -509,18 +633,6 @@ public class VerifyReplication extends Configured implements Tool {
     return true;
   }
 
-  private static void restoreDefaults() {
-    startTime = 0;
-    endTime = Long.MAX_VALUE;
-    batch = -1;
-    versions = -1;
-    tableName = null;
-    families = null;
-    peerId = null;
-    rowPrefixes = null;
-    includeDeletedCells = false;
-  }
-
   /*
    * @param errorMsg Error message.  Can be null.
    */
@@ -530,7 +642,8 @@ public class VerifyReplication extends Configured implements Tool {
     }
     System.err.println("Usage: verifyrep [--starttime=X]" +
         " [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
-        "[--batch=] [--verbose] <peerid> <tablename>");
+        "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
+            + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U]  <peerid> <tablename>");
     System.err.println();
     System.err.println("Options:");
     System.err.println(" starttime    beginning of the time range");
@@ -546,6 +659,12 @@ public class VerifyReplication extends Configured implements Tool {
     System.err.println(" recomparesleep   milliseconds to sleep before recompare row, " +
         "default value is 0 which disables the recompare.");
     System.err.println(" verbose      logs row keys of good rows");
+    System.err.println(" sourceSnapshotName  Source Snapshot Name");
+    System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
+    System.err.println(" peerSnapshotName  Peer Snapshot Name");
+    System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
+    System.err.println(" peerFSAddress      Peer cluster Hadoop FS address");
+    System.err.println(" peerHBaseRootAddress  Peer cluster HBase root location");
     System.err.println();
     System.err.println("Args:");
     System.err.println(" peerid       Id of the peer used for verification, must match the one given for replication");

http://git-wip-us.apache.org/repos/asf/hbase/blob/2de6b051/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 1c5a994..f1b2015 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -18,10 +18,7 @@
 
 package org.apache.hadoop.hbase.replication;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -33,6 +30,8 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -59,10 +58,12 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.Replication;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -73,6 +74,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
+import com.google.common.collect.Lists;
+
 @Category({ReplicationTests.class, LargeTests.class})
 public class TestReplicationSmallTests extends TestReplicationBase {
 
@@ -593,7 +596,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
 
   private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
       throws IOException, InterruptedException, ClassNotFoundException {
-    Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args);
+    Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
     if (job == null) {
       fail("Job wasn't created, see the log");
     }
@@ -864,4 +867,114 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
   }
 
+  @Test(timeout = 300000)
+  public void testVerifyReplicationSnapshotArguments() {
+    String[] args =
+        new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
+        tableName.getNameAsString() };
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
+    assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
+        "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
+        tableName.getNameAsString() };
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+    args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
+        "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
+        "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
+
+    assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+  }
+
+  @Test(timeout = 300000)
+  public void testVerifyReplicationWithSnapshotSupport() throws Exception {
+    // Populate the tables, at the same time it guarantees that the tables are
+    // identical since it does the check
+    testSmallBatch();
+
+    // Take source and target tables snapshot
+    Path rootDir = FSUtils.getRootDir(conf1);
+    FileSystem fs = rootDir.getFileSystem(conf1);
+    String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
+      new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+    // Take target snapshot
+    Path peerRootDir = FSUtils.getRootDir(conf2);
+    FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+    String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
+      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+    String peerFSAddress = peerFs.getUri().toString();
+    String temPath1 = utility1.getRandomDir().toString();
+    String temPath2 = "/tmp2";
+
+    String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+    Job job = new VerifyReplication().createSubmittableJob(conf1, args);
+    if (job == null) {
+      fail("Job wasn't created, see the log");
+    }
+    if (!job.waitForCompletion(true)) {
+      fail("Job failed, see the log");
+    }
+    assertEquals(NB_ROWS_IN_BATCH,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(0,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+    Scan scan = new Scan();
+    ResultScanner rs = htable2.getScanner(scan);
+    Put put = null;
+    for (Result result : rs) {
+      put = new Put(result.getRow());
+      Cell firstVal = result.rawCells()[0];
+      put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+        Bytes.toBytes("diff data"));
+      htable2.put(put);
+    }
+    Delete delete = new Delete(put.getRow());
+    htable2.delete(delete);
+
+    sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
+      new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+    peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+    SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
+      new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+    args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+        "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+        "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+        "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+    job = new VerifyReplication().createSubmittableJob(conf1, args);
+    if (job == null) {
+      fail("Job wasn't created, see the log");
+    }
+    if (!job.waitForCompletion(true)) {
+      fail("Job failed, see the log");
+    }
+    assertEquals(0,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+    assertEquals(NB_ROWS_IN_BATCH,
+      job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+  }
 }