You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/05/14 22:50:23 UTC
[hbase] branch branch-1.4 updated: HBASE-20305 Add option to
SyncTable that skip deletes on target cluster
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1.4 by this push:
new 90a7c77 HBASE-20305 Add option to SyncTable that skip deletes on target cluster
90a7c77 is described below
commit 90a7c77b4f680a51c10dabc2829bee1bed95aab3
Author: wellington <wc...@cloudera.com>
AuthorDate: Wed Mar 28 22:12:01 2018 +0100
HBASE-20305 Add option to SyncTable that skip deletes on target cluster
Change-Id: Iccbcd4a7e7ed176d8404cb2ab17e3e47663e0441
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../apache/hadoop/hbase/mapreduce/SyncTable.java | 40 ++-
.../hadoop/hbase/mapreduce/TestSyncTable.java | 280 +++++++++++++++++++--
2 files changed, 290 insertions(+), 30 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
index ddb169e..31d5172 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/SyncTable.java
@@ -63,7 +63,9 @@ public class SyncTable extends Configured implements Tool {
static final String TARGET_TABLE_CONF_KEY = "sync.table.target.table.name";
static final String SOURCE_ZK_CLUSTER_CONF_KEY = "sync.table.source.zk.cluster";
static final String TARGET_ZK_CLUSTER_CONF_KEY = "sync.table.target.zk.cluster";
- static final String DRY_RUN_CONF_KEY="sync.table.dry.run";
+ static final String DRY_RUN_CONF_KEY = "sync.table.dry.run";
+ static final String DO_DELETES_CONF_KEY = "sync.table.do.deletes";
+ static final String DO_PUTS_CONF_KEY = "sync.table.do.puts";
Path sourceHashDir;
String sourceTableName;
@@ -72,6 +74,8 @@ public class SyncTable extends Configured implements Tool {
String sourceZkCluster;
String targetZkCluster;
boolean dryRun;
+ boolean doDeletes = true;
+ boolean doPuts = true;
Counters counters;
@@ -143,6 +147,8 @@ public class SyncTable extends Configured implements Tool {
initCredentialsForHBase(targetZkCluster, job);
}
jobConf.setBoolean(DRY_RUN_CONF_KEY, dryRun);
+ jobConf.setBoolean(DO_DELETES_CONF_KEY, doDeletes);
+ jobConf.setBoolean(DO_PUTS_CONF_KEY, doPuts);
TableMapReduceUtil.initTableMapperJob(targetTableName, tableHash.initScan(),
SyncMapper.class, null, null, job);
@@ -177,6 +183,8 @@ public class SyncTable extends Configured implements Tool {
Table sourceTable;
Table targetTable;
boolean dryRun;
+ boolean doDeletes = true;
+ boolean doPuts = true;
HashTable.TableHash sourceTableHash;
HashTable.TableHash.Reader sourceHashReader;
@@ -200,7 +208,9 @@ public class SyncTable extends Configured implements Tool {
TableOutputFormat.OUTPUT_CONF_PREFIX);
sourceTable = openTable(sourceConnection, conf, SOURCE_TABLE_CONF_KEY);
targetTable = openTable(targetConnection, conf, TARGET_TABLE_CONF_KEY);
- dryRun = conf.getBoolean(SOURCE_TABLE_CONF_KEY, false);
+ dryRun = conf.getBoolean(DRY_RUN_CONF_KEY, false);
+ doDeletes = conf.getBoolean(DO_DELETES_CONF_KEY, true);
+ doPuts = conf.getBoolean(DO_PUTS_CONF_KEY, true);
sourceTableHash = HashTable.TableHash.read(conf, sourceHashDir);
LOG.info("Read source hash manifest: " + sourceTableHash);
@@ -487,7 +497,7 @@ public class SyncTable extends Configured implements Tool {
context.getCounter(Counter.TARGETMISSINGCELLS).increment(1);
matchingRow = false;
- if (!dryRun) {
+ if (!dryRun && doPuts) {
if (put == null) {
put = new Put(rowKey);
}
@@ -502,8 +512,8 @@ public class SyncTable extends Configured implements Tool {
context.getCounter(Counter.SOURCEMISSINGCELLS).increment(1);
matchingRow = false;
- if (!dryRun) {
- if (delete == null) {
+ if (!dryRun && doDeletes) {
+ if (delete == null) {
delete = new Delete(rowKey);
}
// add a tombstone to exactly match the target cell that is missing on the source
@@ -529,8 +539,8 @@ public class SyncTable extends Configured implements Tool {
context.getCounter(Counter.DIFFERENTCELLVALUES).increment(1);
matchingRow = false;
- if (!dryRun) {
- // overwrite target cell
+ if (!dryRun && doPuts) {
+ // overwrite target cell
if (put == null) {
put = new Put(rowKey);
}
@@ -709,6 +719,10 @@ public class SyncTable extends Configured implements Tool {
System.err.println(" (defaults to cluster in classpath's config)");
System.err.println(" dryrun if true, output counters but no writes");
System.err.println(" (defaults to false)");
+ System.err.println(" doDeletes if false, does not perform deletes");
+ System.err.println(" (defaults to true)");
+ System.err.println(" doPuts if false, does not perform puts ");
+ System.err.println(" (defaults to true)");
System.err.println();
System.err.println("Args:");
System.err.println(" sourcehashdir path to HashTable output dir for source table");
@@ -760,6 +774,18 @@ public class SyncTable extends Configured implements Tool {
continue;
}
+ final String doDeletesKey = "--doDeletes=";
+ if (cmd.startsWith(doDeletesKey)) {
+ doDeletes = Boolean.parseBoolean(cmd.substring(doDeletesKey.length()));
+ continue;
+ }
+
+ final String doPutsKey = "--doPuts=";
+ if (cmd.startsWith(doPutsKey)) {
+ doPuts = Boolean.parseBoolean(cmd.substring(doPutsKey.length()));
+ continue;
+ }
+
printUsage("Invalid argument '" + cmd + "'");
return false;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
index 1b356e6..b43f611 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSyncTable.java
@@ -70,6 +70,7 @@ public class TestSyncTable {
@AfterClass
public static void afterClass() throws Exception {
+ TEST_UTIL.cleanupDataTestDirOnTestFS();
TEST_UTIL.shutdownMiniCluster();
}
@@ -83,8 +84,8 @@ public class TestSyncTable {
@Test
public void testSyncTable() throws Exception {
- String sourceTableName = "testSourceTable";
- String targetTableName = "testTargetTable";
+ final TableName sourceTableName = TableName.valueOf("testSourceTable");
+ final TableName targetTableName = TableName.valueOf("testTargetTable");
Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTable");
writeTestData(sourceTableName, targetTableName);
@@ -101,13 +102,58 @@ public class TestSyncTable {
TEST_UTIL.deleteTable(sourceTableName);
TEST_UTIL.deleteTable(targetTableName);
- TEST_UTIL.cleanupDataTestDirOnTestFS();
}
- private void assertEqualTables(int expectedRows, String sourceTableName, String targetTableName)
- throws Exception {
- Table sourceTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(sourceTableName));
- Table targetTable = TEST_UTIL.getConnection().getTable(TableName.valueOf(targetTableName));
+ @Test
+ public void testSyncTableDoDeletesFalse() throws Exception {
+ final TableName sourceTableName = TableName.valueOf("testSyncTableDoDeletesFalse_source");
+ final TableName targetTableName = TableName.valueOf("testSyncTableDoDeletesFalse_target");
+ Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoDeletesFalse");
+
+ writeTestData(sourceTableName, targetTableName);
+ hashSourceTable(sourceTableName, testDir);
+ Counters syncCounters = syncTables(sourceTableName, targetTableName,
+ testDir, "--doDeletes=false");
+ assertTargetDoDeletesFalse(100, sourceTableName, targetTableName);
+
+ assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
+ assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
+ assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
+ assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
+ assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
+ assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
+
+ TEST_UTIL.deleteTable(sourceTableName);
+ TEST_UTIL.deleteTable(targetTableName);
+ }
+
+ @Test
+ public void testSyncTableDoPutsFalse() throws Exception {
+ final TableName sourceTableName = TableName.valueOf("testSyncTableDoPutsFalse_source");
+ final TableName targetTableName = TableName.valueOf("testSyncTableDoPutsFalse_target");
+ Path testDir = TEST_UTIL.getDataTestDirOnTestFS("testSyncTableDoPutsFalse");
+
+ writeTestData(sourceTableName, targetTableName);
+ hashSourceTable(sourceTableName, testDir);
+ Counters syncCounters = syncTables(sourceTableName, targetTableName,
+ testDir, "--doPuts=false");
+ assertTargetDoPutsFalse(70, sourceTableName, targetTableName);
+
+ assertEquals(60, syncCounters.findCounter(Counter.ROWSWITHDIFFS).getValue());
+ assertEquals(10, syncCounters.findCounter(Counter.SOURCEMISSINGROWS).getValue());
+ assertEquals(10, syncCounters.findCounter(Counter.TARGETMISSINGROWS).getValue());
+ assertEquals(50, syncCounters.findCounter(Counter.SOURCEMISSINGCELLS).getValue());
+ assertEquals(50, syncCounters.findCounter(Counter.TARGETMISSINGCELLS).getValue());
+ assertEquals(20, syncCounters.findCounter(Counter.DIFFERENTCELLVALUES).getValue());
+
+ TEST_UTIL.deleteTable(sourceTableName);
+ TEST_UTIL.deleteTable(targetTableName);
+ }
+
+ private void assertEqualTables(int expectedRows, TableName sourceTableName,
+ TableName targetTableName) throws Exception {
+ Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
+ Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
ResultScanner targetScanner = targetTable.getScanner(new Scan());
@@ -180,21 +226,209 @@ public class TestSyncTable {
targetTable.close();
}
- private Counters syncTables(String sourceTableName, String targetTableName,
- Path testDir) throws Exception {
+ private void assertTargetDoDeletesFalse(int expectedRows, TableName
+ sourceTableName,
+ TableName targetTableName)
+ throws Exception {
+ Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
+ Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
+
+ ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
+ ResultScanner targetScanner = targetTable.getScanner(new Scan());
+ Result targetRow = targetScanner.next();
+ Result sourceRow = sourceScanner.next();
+ int rowsCount = 0;
+ while (targetRow!=null) {
+ rowsCount++;
+ //only compares values for existing rows, skipping rows existing on
+ //target only that were not deleted given --doDeletes=false
+ if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
+ targetRow = targetScanner.next();
+ continue;
+ }
+
+ LOG.debug("SOURCE row: " + (sourceRow == null ? "null"
+ : Bytes.toInt(sourceRow.getRow()))
+ + " cells:" + sourceRow);
+ LOG.debug("TARGET row: " + (targetRow == null ? "null"
+ : Bytes.toInt(targetRow.getRow()))
+ + " cells:" + targetRow);
+
+ Cell[] sourceCells = sourceRow.rawCells();
+ Cell[] targetCells = targetRow.rawCells();
+ int targetRowKey = Bytes.toInt(targetRow.getRow());
+ if (targetRowKey >= 70 && targetRowKey < 80) {
+ if (sourceCells.length == targetCells.length) {
+ LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+ LOG.debug("Target cells: " + Arrays.toString(targetCells));
+ Assert.fail("Row " + targetRowKey + " should have more cells in "
+ + "target than in source");
+ }
+
+ } else {
+ if (sourceCells.length != targetCells.length) {
+ LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+ LOG.debug("Target cells: " + Arrays.toString(targetCells));
+ Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
+ + " has " + sourceCells.length
+ + " cells in source table but " + targetCells.length
+ + " cells in target table");
+ }
+ }
+ for (int j = 0; j < sourceCells.length; j++) {
+ Cell sourceCell = sourceCells[j];
+ Cell targetCell = targetCells[j];
+ try {
+ if (!CellUtil.matchingRow(sourceCell, targetCell)) {
+ Assert.fail("Rows don't match");
+ }
+ if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
+ Assert.fail("Families don't match");
+ }
+ if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
+ Assert.fail("Qualifiers don't match");
+ }
+ if(targetRowKey < 80 && targetRowKey >= 90){
+ if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
+ Assert.fail("Timestamps don't match");
+ }
+ }
+ if (!CellUtil.matchingValue(sourceCell, targetCell)) {
+ Assert.fail("Values don't match");
+ }
+ } catch (Throwable t) {
+ LOG.debug("Source cell: " + sourceCell + " target cell: "
+ + targetCell);
+ Throwables.propagate(t);
+ }
+ }
+ targetRow = targetScanner.next();
+ sourceRow = sourceScanner.next();
+ }
+ assertEquals("Target expected rows does not match.",expectedRows,
+ rowsCount);
+ sourceScanner.close();
+ targetScanner.close();
+ sourceTable.close();
+ targetTable.close();
+ }
+
+ private void assertTargetDoPutsFalse(int expectedRows, TableName
+ sourceTableName,
+ TableName targetTableName)
+ throws Exception {
+ Table sourceTable = TEST_UTIL.getConnection().getTable(sourceTableName);
+ Table targetTable = TEST_UTIL.getConnection().getTable(targetTableName);
+
+ ResultScanner sourceScanner = sourceTable.getScanner(new Scan());
+ ResultScanner targetScanner = targetTable.getScanner(new Scan());
+ Result targetRow = targetScanner.next();
+ Result sourceRow = sourceScanner.next();
+ int rowsCount = 0;
+
+ while (targetRow!=null) {
+ //only compares values for existing rows, skipping rows existing on
+ //source only that were not added to target given --doPuts=false
+ if (Bytes.toInt(sourceRow.getRow()) != Bytes.toInt(targetRow.getRow())) {
+ sourceRow = sourceScanner.next();
+ continue;
+ }
+
+ LOG.debug("SOURCE row: " + (sourceRow == null ?
+ "null" :
+ Bytes.toInt(sourceRow.getRow()))
+ + " cells:" + sourceRow);
+ LOG.debug("TARGET row: " + (targetRow == null ?
+ "null" :
+ Bytes.toInt(targetRow.getRow()))
+ + " cells:" + targetRow);
+
+ LOG.debug("rowsCount: " + rowsCount);
+
+ Cell[] sourceCells = sourceRow.rawCells();
+ Cell[] targetCells = targetRow.rawCells();
+ int targetRowKey = Bytes.toInt(targetRow.getRow());
+ if (targetRowKey >= 40 && targetRowKey < 60) {
+ LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+ LOG.debug("Target cells: " + Arrays.toString(targetCells));
+ Assert.fail("There shouldn't exist any rows between 40 and 60, since "
+ + "Puts are disabled and Deletes are enabled.");
+ } else if (targetRowKey >= 60 && targetRowKey < 70) {
+ if (sourceCells.length == targetCells.length) {
+ LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+ LOG.debug("Target cells: " + Arrays.toString(targetCells));
+ Assert.fail("Row " + Bytes.toInt(sourceRow.getRow())
+ + " shouldn't have same number of cells.");
+ }
+ } else if (targetRowKey >= 80 && targetRowKey < 90) {
+ LOG.debug("Source cells: " + Arrays.toString(sourceCells));
+ LOG.debug("Target cells: " + Arrays.toString(targetCells));
+ Assert.fail("There should be no rows between 80 and 90 on target, as "
+ + "these had different timestamps and should had been deleted.");
+ } else if (targetRowKey >= 90 && targetRowKey < 100) {
+ for (int j = 0; j < sourceCells.length; j++) {
+ Cell sourceCell = sourceCells[j];
+ Cell targetCell = targetCells[j];
+ if (CellUtil.matchingValue(sourceCell, targetCell)) {
+ Assert.fail("Cells values should not match for rows between "
+ + "90 and 100. Target row id: " + (Bytes.toInt(targetRow
+ .getRow())));
+ }
+ }
+ } else {
+ for (int j = 0; j < sourceCells.length; j++) {
+ Cell sourceCell = sourceCells[j];
+ Cell targetCell = targetCells[j];
+ try {
+ if (!CellUtil.matchingRow(sourceCell, targetCell)) {
+ Assert.fail("Rows don't match");
+ }
+ if (!CellUtil.matchingFamily(sourceCell, targetCell)) {
+ Assert.fail("Families don't match");
+ }
+ if (!CellUtil.matchingQualifier(sourceCell, targetCell)) {
+ Assert.fail("Qualifiers don't match");
+ }
+ if (!CellUtil.matchingTimestamp(sourceCell, targetCell)) {
+ Assert.fail("Timestamps don't match");
+ }
+ if (!CellUtil.matchingValue(sourceCell, targetCell)) {
+ Assert.fail("Values don't match");
+ }
+ } catch (Throwable t) {
+ LOG.debug(
+ "Source cell: " + sourceCell + " target cell: " + targetCell);
+ Throwables.propagate(t);
+ }
+ }
+ }
+ rowsCount++;
+ targetRow = targetScanner.next();
+ sourceRow = sourceScanner.next();
+ }
+ assertEquals("Target expected rows does not match.",expectedRows,
+ rowsCount);
+ sourceScanner.close();
+ targetScanner.close();
+ sourceTable.close();
+ targetTable.close();
+ }
+
+ private Counters syncTables(TableName sourceTableName, TableName targetTableName,
+ Path testDir, String... options) throws Exception {
SyncTable syncTable = new SyncTable(TEST_UTIL.getConfiguration());
- int code = syncTable.run(new String[] {
- testDir.toString(),
- sourceTableName,
- targetTableName
- });
+ String[] args = Arrays.copyOf(options, options.length+3);
+ args[options.length] = testDir.toString();
+ args[options.length+1] = sourceTableName.getNameAsString();
+ args[options.length+2] = targetTableName.getNameAsString();
+ int code = syncTable.run(args);
assertEquals("sync table job failed", 0, code);
LOG.info("Sync tables completed");
return syncTable.counters;
}
- private void hashSourceTable(String sourceTableName, Path testDir)
+ private void hashSourceTable(TableName sourceTableName, Path testDir)
throws Exception, IOException {
int numHashFiles = 3;
long batchSize = 100; // should be 2 batches per region
@@ -204,14 +438,14 @@ public class TestSyncTable {
"--batchsize=" + batchSize,
"--numhashfiles=" + numHashFiles,
"--scanbatch=" + scanBatch,
- sourceTableName,
+ sourceTableName.getNameAsString(),
testDir.toString()});
assertEquals("hash table job failed", 0, code);
FileSystem fs = TEST_UTIL.getTestFileSystem();
HashTable.TableHash tableHash = HashTable.TableHash.read(fs.getConf(), testDir);
- assertEquals(sourceTableName, tableHash.tableName);
+ assertEquals(sourceTableName.getNameAsString(), tableHash.tableName);
assertEquals(batchSize, tableHash.batchSize);
assertEquals(numHashFiles, tableHash.numHashFiles);
assertEquals(numHashFiles - 1, tableHash.partitions.size());
@@ -219,7 +453,7 @@ public class TestSyncTable {
LOG.info("Hash table completed");
}
- private void writeTestData(String sourceTableName, String targetTableName)
+ private void writeTestData(TableName sourceTableName, TableName targetTableName)
throws Exception {
final byte[] family = Bytes.toBytes("family");
final byte[] column1 = Bytes.toBytes("c1");
@@ -231,12 +465,12 @@ public class TestSyncTable {
int numRows = 100;
int sourceRegions = 10;
int targetRegions = 6;
-
- HTable sourceTable = TEST_UTIL.createTable(TableName.valueOf(sourceTableName),
- family, generateSplits(numRows, sourceRegions));
- HTable targetTable = TEST_UTIL.createTable(TableName.valueOf(targetTableName),
- family, generateSplits(numRows, targetRegions));
+ Table sourceTable = TEST_UTIL.createTable(sourceTableName,
+ family, generateSplits(numRows, sourceRegions));
+
+ Table targetTable = TEST_UTIL.createTable(targetTableName,
+ family, generateSplits(numRows, targetRegions));
long timestamp = 1430764183454L;