You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/10/03 05:27:36 UTC
git commit: HBASE-10153 improve VerifyReplication to compute BADROWS
more accurately (Jianwei)
Repository: hbase
Updated Branches:
refs/heads/0.98 93871c670 -> a3cfd5233
HBASE-10153 improve VerifyReplication to compute BADROWS more accurately (Jianwei)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a3cfd523
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a3cfd523
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a3cfd523
Branch: refs/heads/0.98
Commit: a3cfd5233dfbfdd57ac445acd0886df2f8bae895
Parents: 93871c6
Author: Ted Yu <te...@apache.org>
Authored: Fri Oct 3 03:27:24 2014 +0000
Committer: Ted Yu <te...@apache.org>
Committed: Fri Oct 3 03:27:24 2014 +0000
----------------------------------------------------------------------
.../replication/VerifyReplication.java | 67 ++++++++++++++++----
1 file changed, 55 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a3cfd523/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index bb17e64..1088688 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
@@ -78,9 +79,11 @@ public class VerifyReplication extends Configured implements Tool {
public static class Verifier
extends TableMapper<ImmutableBytesWritable, Put> {
- public static enum Counters {GOODROWS, BADROWS}
+ public static enum Counters {
+ GOODROWS, BADROWS, ONLY_IN_SOURCE_TABLE_ROWS, ONLY_IN_PEER_TABLE_ROWS, CONTENT_DIFFERENT_ROWS}
private ResultScanner replicatedScanner;
+ private Result currentCompareRowInPeerTable;
/**
* Map method that compares every scanned row with the equivalent from
@@ -111,6 +114,8 @@ public class VerifyReplication extends Configured implements Tool {
if (versions >= 0) {
scan.setMaxVersions(versions);
}
+
+ final TableSplit tableSplit = (TableSplit)(context.getInputSplit());
HConnectionManager.execute(new HConnectable<Void>(conf) {
@Override
public Void connect(HConnection conn) throws IOException {
@@ -119,26 +124,64 @@ public class VerifyReplication extends Configured implements Tool {
ZKUtil.applyClusterKeyToConf(peerConf, zkClusterKey);
HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
- scan.setStartRow(value.getRow());
+ scan.setStartRow(tableSplit.getStartRow());
+ scan.setStopRow(tableSplit.getEndRow());
replicatedScanner = replicatedTable.getScanner(scan);
return null;
}
});
+ currentCompareRowInPeerTable = replicatedScanner.next();
}
- Result res = replicatedScanner.next();
- try {
- Result.compareResults(value, res);
- context.getCounter(Counters.GOODROWS).increment(1);
- } catch (Exception e) {
- LOG.warn("Bad row", e);
- context.getCounter(Counters.BADROWS).increment(1);
+ while (true) {
+ if (currentCompareRowInPeerTable == null) {
+ // reach the region end of peer table, row only in source table
+ logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+ break;
+ }
+ int rowCmpRet = Bytes.compareTo(value.getRow(), currentCompareRowInPeerTable.getRow());
+ if (rowCmpRet == 0) {
+ // rowkey is same, need to compare the content of the row
+ try {
+ Result.compareResults(value, currentCompareRowInPeerTable);
+ context.getCounter(Counters.GOODROWS).increment(1);
+ } catch (Exception e) {
+ logFailRowAndIncreaseCounter(context, Counters.CONTENT_DIFFERENT_ROWS, value);
+ }
+ currentCompareRowInPeerTable = replicatedScanner.next();
+ break;
+ } else if (rowCmpRet < 0) {
+ // row only exists in source table
+ logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_SOURCE_TABLE_ROWS, value);
+ break;
+ } else {
+ // row only exists in peer table
+ logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
+ currentCompareRowInPeerTable);
+ currentCompareRowInPeerTable = replicatedScanner.next();
+ }
}
}
-
+
+ private void logFailRowAndIncreaseCounter(Context context, Counters counter, Result row) {
+ context.getCounter(counter).increment(1);
+ context.getCounter(Counters.BADROWS).increment(1);
+ LOG.error(counter.toString() + ", rowkey=" + Bytes.toString(row.getRow()));
+ }
+
protected void cleanup(Context context) {
if (replicatedScanner != null) {
- replicatedScanner.close();
- replicatedScanner = null;
+ try {
+ while (currentCompareRowInPeerTable != null) {
+ logFailRowAndIncreaseCounter(context, Counters.ONLY_IN_PEER_TABLE_ROWS,
+ currentCompareRowInPeerTable);
+ currentCompareRowInPeerTable = replicatedScanner.next();
+ }
+ } catch (Exception e) {
+ LOG.error("fail to scan peer table in cleanup", e);
+ } finally {
+ replicatedScanner.close();
+ replicatedScanner = null;
+ }
}
}
}