You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by br...@apache.org on 2019/02/10 07:17:59 UTC
[hbase] branch branch-2.1 updated: HBASE-21201 Support to run
VerifyReplication MR tool without peerid
This is an automated email from the ASF dual-hosted git repository.
brfrn169 pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 5464ed0 HBASE-21201 Support to run VerifyReplication MR tool without peerid
5464ed0 is described below
commit 5464ed0f9e16d917484ccb10998a99fa4b2197f0
Author: Toshihiro Suzuki <br...@gmail.com>
AuthorDate: Fri Jan 25 00:24:39 2019 +0900
HBASE-21201 Support to run VerifyReplication MR tool without peerid
Signed-off-by: Josh Elser <el...@apache.org>
---
.../mapreduce/replication/VerifyReplication.java | 57 +++++++---
.../hbase/replication/TestVerifyReplication.java | 123 ++++++++++++++++-----
2 files changed, 138 insertions(+), 42 deletions(-)
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 85eebc5..ead64c9 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
@@ -96,6 +97,7 @@ public class VerifyReplication extends Configured implements Tool {
String families = null;
String delimiter = "";
String peerId = null;
+ String peerQuorumAddress = null;
String rowPrefixes = null;
int sleepMsBeforeReCompare = 0;
boolean verbose = false;
@@ -385,7 +387,6 @@ public class VerifyReplication extends Configured implements Tool {
if (!doCommandLine(args)) {
return null;
}
- conf.set(NAME+".peerId", peerId);
conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime);
conf.setLong(NAME+".endTime", endTime);
@@ -401,14 +402,23 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME+".rowPrefixes", rowPrefixes);
}
- Pair<ReplicationPeerConfig, Configuration> peerConfigPair = getPeerQuorumConfig(conf, peerId);
- ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
- String peerQuorumAddress = peerConfig.getClusterKey();
- LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
+ String peerQuorumAddress;
+ Pair<ReplicationPeerConfig, Configuration> peerConfigPair = null;
+ if (peerId != null) {
+ peerConfigPair = getPeerQuorumConfig(conf, peerId);
+ ReplicationPeerConfig peerConfig = peerConfigPair.getFirst();
+ peerQuorumAddress = peerConfig.getClusterKey();
+ LOG.info("Peer Quorum Address: " + peerQuorumAddress + ", Peer Configuration: " +
peerConfig.getConfiguration());
- conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
- HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
+ conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+ HBaseConfiguration.setWithPrefix(conf, PEER_CONFIG_PREFIX,
peerConfig.getConfiguration().entrySet());
+ } else {
+ assert this.peerQuorumAddress != null;
+ peerQuorumAddress = this.peerQuorumAddress;
+ LOG.info("Peer Quorum Address: " + peerQuorumAddress);
+ conf.set(NAME + ".peerQuorumAddress", peerQuorumAddress);
+ }
conf.setInt(NAME + ".versions", versions);
LOG.info("Number of version: " + versions);
@@ -463,9 +473,13 @@ public class VerifyReplication extends Configured implements Tool {
} else {
TableMapReduceUtil.initTableMapperJob(tableName, scan, Verifier.class, null, null, job);
}
- Configuration peerClusterConf = peerConfigPair.getSecond();
- // Obtain the auth token from peer cluster
- TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
+
+ if (peerId != null) {
+ assert peerConfigPair != null;
+ Configuration peerClusterConf = peerConfigPair.getSecond();
+ // Obtain the auth token from peer cluster
+ TableMapReduceUtil.initCredentialsForCluster(job, peerClusterConf);
+ }
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
@@ -610,7 +624,11 @@ public class VerifyReplication extends Configured implements Tool {
}
if (i == args.length-2) {
- peerId = cmd;
+ if (isPeerQuorumAddress(cmd)) {
+ peerQuorumAddress = cmd;
+ } else {
+ peerId = cmd;
+ }
}
if (i == args.length-1) {
@@ -651,6 +669,16 @@ public class VerifyReplication extends Configured implements Tool {
return true;
}
+ private boolean isPeerQuorumAddress(String cmd) {
+ try {
+ ZKConfig.validateClusterKey(cmd);
+ } catch (IOException e) {
+ // not a quorum address
+ return false;
+ }
+ return true;
+ }
+
/*
* @param errorMsg Error message. Can be null.
*/
@@ -660,8 +688,9 @@ public class VerifyReplication extends Configured implements Tool {
}
System.err.println("Usage: verifyrep [--starttime=X]" +
" [--endtime=Y] [--families=A] [--row-prefixes=B] [--delimiter=] [--recomparesleep=] " +
- "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] [--peerSnapshotName=R] "
- + "[--peerSnapshotTmpDir=S] [--peerFSAddress=T] [--peerHBaseRootAddress=U] <peerid> <tablename>");
+ "[--batch=] [--verbose] [--sourceSnapshotName=P] [--sourceSnapshotTmpDir=Q] "
+ + "[--peerSnapshotName=R] [--peerSnapshotTmpDir=S] [--peerFSAddress=T] "
+ + "[--peerHBaseRootAddress=U] <peerid|peerQuorumAddress> <tablename>");
System.err.println();
System.err.println("Options:");
System.err.println(" starttime beginning of the time range");
@@ -686,6 +715,8 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println();
System.err.println("Args:");
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");
+ System.err.println(" peerQuorumAddress quorumAdress of the peer used for verification. The "
+ + "format is zk_quorum:zk_port:zk_hbase_path");
System.err.println(" tablename Name of the table to verify");
System.err.println();
System.err.println("Examples:");
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
index e1fda4e..8b52390 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
@@ -422,36 +422,24 @@ public class TestVerifyReplication extends TestReplicationBase {
FileSystem fs = rootDir.getFileSystem(conf1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
- new String(famName), sourceSnapshotName, rootDir, fs, true);
+ Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
Path peerRootDir = FSUtils.getRootDir(conf2);
FileSystem peerFs = peerRootDir.getFileSystem(conf2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
- new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+ Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
String temPath1 = utility1.getRandomDir().toString();
- String temPath2 = "/tmp2";
+ String temPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
-
- Job job = new VerifyReplication().createSubmittableJob(conf1, args);
- if (job == null) {
- fail("Job wasn't created, see the log");
- }
- if (!job.waitForCompletion(true)) {
- fail("Job failed, see the log");
- }
- assertEquals(NB_ROWS_IN_BATCH,
- job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
- assertEquals(0,
- job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
-
+ runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
checkRestoreTmpDir(conf1, temPath1, 1);
checkRestoreTmpDir(conf2, temPath2, 1);
@@ -470,30 +458,107 @@ public class TestVerifyReplication extends TestReplicationBase {
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
- new String(famName), sourceSnapshotName, rootDir, fs, true);
+ Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
- new String(famName), peerSnapshotName, peerRootDir, peerFs, true);
+ Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
"--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+ runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+ checkRestoreTmpDir(conf1, temPath1, 2);
+ checkRestoreTmpDir(conf2, temPath2, 2);
+ }
- job = new VerifyReplication().createSubmittableJob(conf1, args);
- if (job == null) {
- fail("Job wasn't created, see the log");
+ @Test
+ public void testVerifyRepJobWithQuorumAddress() throws Exception {
+ // Populate the tables, at the same time it guarantees that the tables are
+ // identical since it does the check
+ runSmallBatchTest();
+
+ // with a quorum address (a cluster key)
+ String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() };
+ runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
+
+ Scan scan = new Scan();
+ ResultScanner rs = htable2.getScanner(scan);
+ Put put = null;
+ for (Result result : rs) {
+ put = new Put(result.getRow());
+ Cell firstVal = result.rawCells()[0];
+ put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+ Bytes.toBytes("diff data"));
+ htable2.put(put);
}
- if (!job.waitForCompletion(true)) {
- fail("Job failed, see the log");
+ Delete delete = new Delete(put.getRow());
+ htable2.delete(delete);
+ runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+ }
+
+ @Test
+ public void testVerifyRepJobWithQuorumAddressAndSnapshotSupport() throws Exception {
+ // Populate the tables, at the same time it guarantees that the tables are
+ // identical since it does the check
+ runSmallBatchTest();
+
+ // Take source and target tables snapshot
+ Path rootDir = FSUtils.getRootDir(conf1);
+ FileSystem fs = rootDir.getFileSystem(conf1);
+ String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+ Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
+
+ // Take target snapshot
+ Path peerRootDir = FSUtils.getRootDir(conf2);
+ FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+ String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+ Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+ String peerFSAddress = peerFs.getUri().toString();
+ String tmpPath1 = utility1.getRandomDir().toString();
+ String tmpPath2 = "/tmp" + System.currentTimeMillis();
+
+ String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+ "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
+ tableName.getNameAsString() };
+ runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
+ checkRestoreTmpDir(conf1, tmpPath1, 1);
+ checkRestoreTmpDir(conf2, tmpPath2, 1);
+
+ Scan scan = new Scan();
+ ResultScanner rs = htable2.getScanner(scan);
+ Put put = null;
+ for (Result result : rs) {
+ put = new Put(result.getRow());
+ Cell firstVal = result.rawCells()[0];
+ put.addColumn(CellUtil.cloneFamily(firstVal), CellUtil.cloneQualifier(firstVal),
+ Bytes.toBytes("diff data"));
+ htable2.put(put);
}
- assertEquals(0,
- job.getCounters().findCounter(VerifyReplication.Verifier.Counters.GOODROWS).getValue());
- assertEquals(NB_ROWS_IN_BATCH,
- job.getCounters().findCounter(VerifyReplication.Verifier.Counters.BADROWS).getValue());
+ Delete delete = new Delete(put.getRow());
+ htable2.delete(delete);
- checkRestoreTmpDir(conf1, temPath1, 2);
- checkRestoreTmpDir(conf2, temPath2, 2);
+ sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+ Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
+
+ peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
+ SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+ Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
+
+ args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
+ "--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
+ tableName.getNameAsString() };
+ runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
+ checkRestoreTmpDir(conf1, tmpPath1, 2);
+ checkRestoreTmpDir(conf2, tmpPath2, 2);
}
}