You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/05/03 23:03:24 UTC
hbase git commit: HBASE-16466 Snapshots support in VerifyReplication
tool
Repository: hbase
Updated Branches:
refs/heads/master 78f6799f4 -> 2de6b051f
HBASE-16466 Snapshots support in VerifyReplication tool
Signed-off-by: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2de6b051
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2de6b051
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2de6b051
Branch: refs/heads/master
Commit: 2de6b051f67b6a55eda8d4e247328fda24484adb
Parents: 78f6799
Author: Sukumar Maddineni <su...@gmail.com>
Authored: Wed May 3 13:35:26 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Wed May 3 14:22:18 2017 -0700
----------------------------------------------------------------------
.../mapreduce/TableSnapshotInputFormat.java | 5 +
.../replication/VerifyReplication.java | 199 +++++++++++++++----
.../replication/TestReplicationSmallTests.java | 123 +++++++++++-
3 files changed, 282 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2de6b051/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
index 15d403f..a5f699d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormat.java
@@ -120,6 +120,11 @@ public class TableSnapshotInputFormat extends InputFormat<ImmutableBytesWritable
public void readFields(DataInput in) throws IOException {
delegate.readFields(in);
}
+
+ public HRegionInfo getRegionInfo() {
+ return delegate.getRegionInfo();
+ }
+
}
@VisibleForTesting
http://git-wip-us.apache.org/repos/asf/hbase/blob/2de6b051/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 3f8317b..3d32edd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -25,6 +25,8 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
@@ -36,6 +38,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableSnapshotScanner;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
@@ -43,6 +46,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
@@ -50,14 +54,19 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This map-only job compares the data from a local table with a remote one.
* Every cell is compared and must have exactly the same keys (even timestamp)
@@ -75,18 +84,31 @@ public class VerifyReplication extends Configured implements Tool {
public final static String NAME = "verifyrep";
private final static String PEER_CONFIG_PREFIX = NAME + ".peer.";
- static long startTime = 0;
- static long endTime = Long.MAX_VALUE;
- static int batch = -1;
- static int versions = -1;
- static String tableName = null;
- static String families = null;
- static String delimiter = "";
- static String peerId = null;
- static String rowPrefixes = null;
- static int sleepMsBeforeReCompare = 0;
- static boolean verbose = false;
- static boolean includeDeletedCells = false;
+ long startTime = 0;
+ long endTime = Long.MAX_VALUE;
+ int batch = -1;
+ int versions = -1;
+ String tableName = null;
+ String families = null;
+ String delimiter = "";
+ String peerId = null;
+ String rowPrefixes = null;
+ int sleepMsBeforeReCompare = 0;
+ boolean verbose = false;
+ boolean includeDeletedCells = false;
+ //Source table snapshot name
+ String sourceSnapshotName = null;
+ //Temp location in source cluster to restore source snapshot
+ String sourceSnapshotTmpDir = null;
+ //Peer table snapshot name
+ String peerSnapshotName = null;
+ //Temp location in peer cluster to restore peer snapshot
+ String peerSnapshotTmpDir = null;
+ //Peer cluster Hadoop FS address
+ String peerFSAddress = null;
+ //Peer cluster HBase root dir location
+ String peerHBaseRootAddress = null;
+
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
@@ -159,7 +181,7 @@ public class VerifyReplication extends Configured implements Tool {
sourceConnection = ConnectionFactory.createConnection(conf);
sourceTable = sourceConnection.getTable(tableName);
- final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
+ final InputSplit tableSplit = context.getInputSplit();
String zkClusterKey = conf.get(NAME + ".peerQuorumAddress");
Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
@@ -168,8 +190,33 @@ public class VerifyReplication extends Configured implements Tool {
replicatedConnection = ConnectionFactory.createConnection(peerConf);
replicatedTable = replicatedConnection.getTable(tableName);
scan.setStartRow(value.getRow());
- scan.setStopRow(tableSplit.getEndRow());
- replicatedScanner = replicatedTable.getScanner(scan);
+
+ byte[] endRow = null;
+ if (tableSplit instanceof TableSnapshotInputFormat.TableSnapshotRegionSplit) {
+ endRow = ((TableSnapshotInputFormat.TableSnapshotRegionSplit) tableSplit).getRegionInfo()
+ .getEndKey();
+ } else {
+ endRow = ((TableSplit) tableSplit).getEndRow();
+ }
+
+ scan.setStopRow(endRow);
+
+ String peerSnapshotName = conf.get(NAME + ".peerSnapshotName", null);
+ if (peerSnapshotName != null) {
+ String peerSnapshotTmpDir = conf.get(NAME + ".peerSnapshotTmpDir", null);
+ String peerFSAddress = conf.get(NAME + ".peerFSAddress", null);
+ String peerHBaseRootAddress = conf.get(NAME + ".peerHBaseRootAddress", null);
+ FileSystem.setDefaultUri(peerConf, peerFSAddress);
+ FSUtils.setRootDir(peerConf, new Path(peerHBaseRootAddress));
+ LOG.info("Using peer snapshot:" + peerSnapshotName + " with temp dir:"
+ + peerSnapshotTmpDir + " peer root uri:" + FSUtils.getRootDir(peerConf)
+ + " peerFSAddress:" + peerFSAddress);
+
+ replicatedScanner = new TableSnapshotScanner(peerConf,
+ new Path(peerFSAddress, peerSnapshotTmpDir), peerSnapshotName, scan);
+ } else {
+ replicatedScanner = replicatedTable.getScanner(scan);
+ }
currentCompareRowInPeerTable = replicatedScanner.next();
}
while (true) {
@@ -282,7 +329,7 @@ public class VerifyReplication extends Configured implements Tool {
}
private static Pair<ReplicationPeerConfig, Configuration> getPeerQuorumConfig(
- final Configuration conf) throws IOException {
+ final Configuration conf, String peerId) throws IOException {
ZooKeeperWatcher localZKW = null;
ReplicationPeerZKImpl peer = null;
try {
@@ -322,7 +369,7 @@ public class VerifyReplication extends Configured implements Tool {
* @return The newly created job.
* @throws java.io.IOException When setting up the job fails.
*/
- public static Job createSubmittableJob(Configuration conf, String[] args)
+ public Job createSubmittableJob(Configuration conf, String[] args)
throws IOException {
if (!doCommandLine(args)) {
return null;
@@ -343,7 +390,7 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME+".rowPrefixes", rowPrefixes);
}
- Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf);
+ Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
String peerQuorumAddress = peerConfig.getClusterKey();
LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
@@ -355,6 +402,17 @@ public class VerifyReplication extends Configured implements Tool {
conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions);
+ //Set Snapshot specific parameters
+ if (peerSnapshotName != null) {
+ conf.set(NAME + ".peerSnapshotName", peerSnapshotName);
+ conf.set(NAME + ".peerSnapshotTmpDir", peerSnapshotTmpDir);
+ conf.set(NAME + ".peerFSAddress", peerFSAddress);
+ conf.set(NAME + ".peerHBaseRootAddress", peerHBaseRootAddress);
+
+ // This is to create HDFS delegation token for peer cluster in case of secured
+ conf.setStrings(MRJobConfig.JOB_NAMENODES, peerFSAddress);
+ }
+
Job job = Job.getInstance(conf, conf.get(JOB_NAME_CONF_KEY, NAME + "_" + tableName));
job.setJarByClass(VerifyReplication.class);
@@ -378,9 +436,15 @@ public class VerifyReplication extends Configured implements Tool {
setRowPrefixFilter(scan, rowPrefixes);
- TableMapReduceUtil.initTableMapperJob(tableName, scan,
- Verifier.class, null, null, job);
-
+ if (sourceSnapshotName != null) {
+ Path snapshotTempPath = new Path(sourceSnapshotTmpDir);
+ LOG.info(
+ "Using source snapshot-" + sourceSnapshotName + " with temp dir:" + sourceSnapshotTmpDir);
+ TableMapReduceUtil.initTableSnapshotMapperJob(sourceSnapshotName, scan, Verifier.class, null,
+ null, job, true, snapshotTempPath);
+ } else {
+ TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
+ }
Configuration peerClusterConf = peerConfigPair.getSecond();
// Obtain the auth token from peer cluster
TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
@@ -413,15 +477,12 @@ public class VerifyReplication extends Configured implements Tool {
scan.setStopRow(stopRow);
}
- private static boolean doCommandLine(final String[] args) {
+ @VisibleForTesting
+ public boolean doCommandLine(final String[] args) {
if (args.length < 2) {
printUsage(null);
return false;
}
- //in case we've been run before, restore all parameters to their initial states
- //Otherwise, if our previous run included a parameter not in args this time,
- //we might hold on to the old value.
- restoreDefaults();
try {
for (int i = 0; i < args.length; i++) {
String cmd = args[i];
@@ -487,10 +548,47 @@ public class VerifyReplication extends Configured implements Tool {
if (cmd.startsWith(verboseKey)) {
verbose = true;
continue;
- }
+ }
+
+ final String sourceSnapshotNameArgKey = "--sourceSnapshotName=";
+ if (cmd.startsWith(sourceSnapshotNameArgKey)) {
+ sourceSnapshotName = cmd.substring(sourceSnapshotNameArgKey.length());
+ continue;
+ }
+
+ final String sourceSnapshotTmpDirArgKey = "--sourceSnapshotTmpDir=";
+ if (cmd.startsWith(sourceSnapshotTmpDirArgKey)) {
+ sourceSnapshotTmpDir = cmd.substring(sourceSnapshotTmpDirArgKey.length());
+ continue;
+ }
+
+ final String peerSnapshotNameArgKey = "--peerSnapshotName=";
+ if (cmd.startsWith(peerSnapshotNameArgKey)) {
+ peerSnapshotName = cmd.substring(peerSnapshotNameArgKey.length());
+ continue;
+ }
+
+ final String peerSnapshotTmpDirArgKey = "--peerSnapshotTmpDir=";
+ if (cmd.startsWith(peerSnapshotTmpDirArgKey)) {
+ peerSnapshotTmpDir = cmd.substring(peerSnapshotTmpDirArgKey.length());
+ continue;
+ }
+
+ final String peerFSAddressArgKey = "--peerFSAddress=";
+ if (cmd.startsWith(peerFSAddressArgKey)) {
+ peerFSAddress = cmd.substring(peerFSAddressArgKey.length());
+ continue;
+ }
+
+ final String peerHBaseRootAddressArgKey = "--peerHBaseRootAddress=";
+ if (cmd.startsWith(peerHBaseRootAddressArgKey)) {
+ peerHBaseRootAddress = cmd.substring(peerHBaseRootAddressArgKey.length());
+ continue;
+ }
if (cmd.startsWith("--")) {
printUsage("Invalid argument '" + cmd + "'");
+ return false;
}
if (i == args.length-2) {
@@ -501,6 +599,32 @@ public class VerifyReplication extends Configured implements Tool {
tableName = cmd;
}
}
+
+ if ((sourceSnapshotName != null && sourceSnapshotTmpDir == null)
+ || (sourceSnapshotName == null && sourceSnapshotTmpDir != null)) {
+ printUsage("Source snapshot name and snapshot temp location should be provided"
+ + " to use snapshots in source cluster");
+ return false;
+ }
+
+ if (peerSnapshotName != null || peerSnapshotTmpDir != null || peerFSAddress != null
+ || peerHBaseRootAddress != null) {
+ if (peerSnapshotName == null || peerSnapshotTmpDir == null || peerFSAddress == null
+ || peerHBaseRootAddress == null) {
+ printUsage(
+ "Peer snapshot name, peer snapshot temp location, Peer HBase root address and "
+ + "peer FSAddress should be provided to use snapshots in peer cluster");
+ return false;
+ }
+ }
+
+ // This is to avoid making recompare calls to source/peer tables when snapshots are used
+ if ((sourceSnapshotName != null || peerSnapshotName != null) && sleepMsBeforeReCompare > 0) {
+ printUsage(
+ "Using sleepMsBeforeReCompare along with snapshots is not allowed as snapshots are immutable");
+ return false;
+ }
+
} catch (Exception e) {
e.printStackTrace();
printUsage("Can't start because " + e.getMessage());
@@ -509,18 +633,6 @@ public class VerifyReplication extends Configured implements Tool {
return true;
}
- private static void restoreDefaults() {
- startTime = 0;
- endTime = Long.MAX_VALUE;
- batch = -1;
- versions = -1;
- tableName = null;
- families = null;
- peerId = null;
- rowPrefixes = null;
- includeDeletedCells = false;
- }
-
/*
* @param errorMsg Error message. Can be null.
*/
@@ -530,7 +642,8 @@ public class VerifyReplication extends Configured implements Tool {
}
System.err.println("Usage: verifyrep [--starttime=X]" +
" [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
- "[--batch=] [--verbose] <peerid> <tablename>");
+ "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
+ + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>");
System.err.println();
System.err.println("Options:");
System.err.println(" starttime beginning of the time range");
@@ -546,6 +659,12 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
"default value is 0 which disables the recompare.");
System.err.println(" verbose logs row keys of good rows");
+ System.err.println(" sourceSnapshotName Source Snapshot Name");
+ System.err.println(" sourceSnapshotTmpDir Tmp location to restore source table snapshot");
+ System.err.println(" peerSnapshotName Peer Snapshot Name");
+ System.err.println(" peerSnapshotTmpDir Tmp location to restore peer table snapshot");
+ System.err.println(" peerFSAddress Peer cluster Hadoop FS address");
+ System.err.println(" peerHBaseRootAddress Peer cluster HBase root location");
System.err.println();
System.err.println("Args:");
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
http://git-wip-us.apache.org/repos/asf/hbase/blob/2de6b051/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 1c5a994..f1b2015 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -18,10 +18,7 @@
package org.apache.hadoop.hbase.replication;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
@@ -33,6 +30,8 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -59,10 +58,12 @@ import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -73,6 +74,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import com.google.common.collect.Lists;
+
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSmallTests extends TestReplicationBase {
@@ -593,7 +596,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
throws IOException, InterruptedException, ClassNotFoundException {
- Job job = VerifyReplication.createSubmittableJob(new Configuration(CONF_WITH_LOCALFS), args);
+ Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
if (job == null) {
fail("Job wasn't created, see the log");
}
@@ -864,4 +867,114 @@ public class TestReplicationSmallTests extends TestReplicationBase {
runVerifyReplication(args, NB_ROWS_IN_BATCH *2, 0);
}
+ @Test(timeout = 300000)
+ public void testVerifyReplicationSnapshotArguments() {
+ String[] args =
+ new String[] { "--sourceSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+ assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--sourceSnapshotTmpDir=tmp", "2", tableName.getNameAsString() };
+ assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=tmp", "2",
+ tableName.getNameAsString() };
+ assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--peerSnapshotName=snapshot1", "2", tableName.getNameAsString() };
+ assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--peerSnapshotTmpDir=/tmp/", "2", tableName.getNameAsString() };
+ assertFalse(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--peerSnapshotName=snapshot1", "--peerSnapshotTmpDir=/tmp/",
+ "--peerFSAddress=tempfs", "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2",
+ tableName.getNameAsString() };
+ assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+
+ args = new String[] { "--sourceSnapshotName=snapshot1", "--sourceSnapshotTmpDir=/tmp/",
+ "--peerSnapshotName=snapshot2", "--peerSnapshotTmpDir=/tmp/", "--peerFSAddress=tempfs",
+ "--peerHBaseRootAddress=hdfs://tempfs:50070/hbase/", "2", tableName.getNameAsString() };
+
+ assertTrue(Lists.newArrayList(args).toString(), new VerifyReplication().doCommandLine(args));
+ }
+
+ @Test(timeout = 300000)
+ public void testVerifyReplicationWithSnapshotSupport() throws Exception {
+ // Populate the tables, at the same time it guarantees that the tables are
+ // identical since it does the check
+ testSmallBatch();
+
+ // Take source and target tables snapshot
+ Path rootDir = FSUtils.getRootDir(conf1);
+ FileSystem fs = rootDir.getFileSystem(conf1);
+ String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
+ new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+ // Take target snapshot
+ Path peerRootDir = FSUtils.getRootDir(conf2);
+ FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+ String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
+ new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+ String peerFSAddress = peerFs.getUri().toString();
+ String temPath1 = utility1.getRandomDir().toString();
+ String temPath2 = "/tmp2";
+
+ String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+ "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+ Job job = new VerifyReplication().createSubmittableJob(conf1, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(NB_ROWS_IN_BATCH,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(0,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+
+ Scan scan = new Scan();
+ ResultScanner rs = htable2.getScanner(scan);
+ Put put = null;
+ for (Result result : rs) {
+ put = new Put(result.getRow());
+ Cell firstVal = result.rawCells()[0];
+ put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+ Bytes.toBytes("diff data"));
+ htable2.put(put);
+ }
+ Delete delete = new Delete(put.getRow());
+ htable2.delete(delete);
+
+ sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility1.getHBaseAdmin(), tableName,
+ new String(famName), sourceSnapshotName, rootDir, fs, true);
+
+ peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility2.getHBaseAdmin(), tableName,
+ new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+ args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+ "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+
+ job = new VerifyReplication().createSubmittableJob(conf1, args);
+ if (job == null) {
+ fail("Job wasn't created, see the log");
+ }
+ if (!job.waitForCompletion(true)) {
+ fail("Job failed, see the log");
+ }
+ assertEquals(0,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
+ assertEquals(NB_ROWS_IN_BATCH,
+ job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+ }
}