You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2016/09/29 06:56:49 UTC
[04/50] [abbrv] hbase git commit: HBASE-16423 Add re-compare option
to VerifyReplication to avoid occasional inconsistent rows (Jianwei Cui)
HBASE-16423 Add re-compare option to VerifyReplication to avoid occasional inconsistent rows (Jianwei Cui)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/946c1ed8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/946c1ed8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/946c1ed8
Branch: refs/heads/hbase-14439
Commit: 946c1ed8f89967b1f036ee3b0dcc296082eee487
Parents: 191afc8
Author: tedyu <yu...@gmail.com>
Authored: Thu Sep 22 21:01:22 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Thu Sep 22 21:01:22 2016 -0700
----------------------------------------------------------------------
.../replication/VerifyReplication.java | 62 +++++++++++++++++---
1 file changed, 54 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/946c1ed8/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index c4dd3ad..04ae18f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
@@ -83,6 +85,7 @@ public class VerifyReplication extends Configured implements Tool {
static String delimiter = "";
static String peerId = null;
static String rowPrefixes = null;
+ static int sleepMsBeforeReCompare = 0;
private final static String JOB_NAME_CONF_KEY = "mapreduce.job.name";
@@ -97,10 +100,13 @@ public class VerifyReplication extends Configured implements Tool {
public static enum Counters {
GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
- private Connection connection;
+ private Connection sourceConnection;
+ private Table sourceTable;
+ private Connection replicatedConnection;
private Table replicatedTable;
private ResultScanner replicatedScanner;
private Result currentCompareRowInPeerTable;
+ private int sleepMsBeforeReCompare;
/**
* Map method that compares every scanned row with the equivalent from
@@ -116,6 +122,7 @@ public class VerifyReplication extends Configured implements Tool {
throws IOException {
if (replicatedScanner == null) {
Configuration conf = context.getConfiguration();
+ sleepMsBeforeReCompare = conf.getInt(NAME +".sleepMsBeforeReCompare", 0);
final Scan scan = new Scan();
scan.setBatch(batch);
scan.setCacheBlocks(false);
@@ -137,6 +144,9 @@ public class VerifyReplication extends Configured implements Tool {
if (versions >= 0) {
scan.setMaxVersions(versions);
}
+ TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
+ sourceConnection = ConnectionFactory.createConnection(conf);
+ sourceTable = sourceConnection.getTable(tableName);
final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
@@ -144,9 +154,8 @@ public class VerifyReplication extends Configured implements Tool {
Configuration peerConf = HBaseConfiguration.createClusterConf(conf,
zkClusterKey, PEER_CONFIG_PREFIX);
- TableName tableName = TableName.valueOf(conf.get(NAME + ".tableName"));
- connection = ConnectionFactory.createConnection(peerConf);
- replicatedTable = connection.getTable(tableName);
+ replicatedConnection = ConnectionFactory.createConnection(peerConf);
+ replicatedTable = replicatedConnection.getTable(tableName);
scan.setStartRow(value.getRow());
scan.setStopRow(tableSplit.getEndRow());
replicatedScanner = replicatedTable.getScanner(scan);
@@ -184,6 +193,18 @@ public class VerifyReplication extends Configured implements Tool {
}
private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
+ if (sleepMsBeforeReCompare > 0) {
+ Threads.sleep(sleepMsBeforeReCompare);
+ try {
+ Result sourceResult = sourceTable.get(new Get(row.getRow()));
+ Result replicatedResult = replicatedTable.get(new Get(row.getRow()));
+ Result.compareResults(sourceResult, replicatedResult);
+ return;
+ } catch (Exception e) {
+ LOG.error("recompare fail after sleep, rowkey=" + delimiter +
+ Bytes.toString(row.getRow()) + delimiter);
+ }
+ }
context.getCounter(counter).increment(1);
context.getCounter(Counters.BADROWS).increment(1);
LOG.error(counter.toString() + ", rowkey=" + delimiter + Bytes.toString(row.getRow()) +
@@ -206,18 +227,34 @@ public class VerifyReplication extends Configured implements Tool {
replicatedScanner = null;
}
}
+
+ if (sourceTable != null) {
+ try {
+ sourceTable.close();
+ } catch (IOException e) {
+ LOG.error("fail to close source table in cleanup", e);
+ }
+ }
+ if(sourceConnection != null){
+ try {
+ sourceConnection.close();
+ } catch (Exception e) {
+ LOG.error("fail to close source connection in cleanup", e);
+ }
+ }
+
if(replicatedTable != null){
try{
replicatedTable.close();
} catch (Exception e) {
- LOG.error("fail to close table in cleanup", e);
+ LOG.error("fail to close replicated table in cleanup", e);
}
}
- if(connection != null){
+ if(replicatedConnection != null){
try {
- connection.close();
+ replicatedConnection.close();
} catch (Exception e) {
- LOG.error("fail to close connection in cleanup", e);
+ LOG.error("fail to close replicated connection in cleanup", e);
}
}
}
@@ -273,6 +310,7 @@ public class VerifyReplication extends Configured implements Tool {
conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime);
conf.setLong(NAME+".endTime", endTime);
+ conf.setInt(NAME +".sleepMsBeforeReCompare", sleepMsBeforeReCompare);
if (families != null) {
conf.set(NAME+".families", families);
}
@@ -408,6 +446,12 @@ public class VerifyReplication extends Configured implements Tool {
continue;
}
+ final String sleepToReCompareKey = "--recomparesleep=";
+ if (cmd.startsWith(sleepToReCompareKey)) {
+ sleepMsBeforeReCompare = Integer.parseInt(cmd.substring(sleepToReCompareKey.length()));
+ continue;
+ }
+
if (i == args.length-2) {
peerId = cmd;
}
@@ -453,6 +497,8 @@ public class VerifyReplication extends Configured implements Tool {
System.err.println(" families comma-separated list of families to copy");
System.err.println(" row-prefixes comma-separated list of row key prefixes to filter on ");
System.err.println(" delimiter the delimiter used in display around rowkey");
+ System.err.println(" recomparesleep milliseconds to sleep before recompare row, " +
+ "default value is 0 which disables the recompare.");
System.err.println();
System.err.println("Args:");
System.err.println(" peerid Id of the peer used for verification, must match the one given for replication");