You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/06/04 00:24:50 UTC
[hbase] branch branch-2 updated: HBASE-22524 Refactor
TestReplicationSyncUpTool
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 09465f4 HBASE-22524 Refactor TestReplicationSyncUpTool
09465f4 is described below
commit 09465f48a819da44d0e9a1cf36bee70bf5f40c10
Author: zhangduo <zh...@apache.org>
AuthorDate: Mon Jun 3 20:47:32 2019 +0800
HBASE-22524 Refactor TestReplicationSyncUpTool
---
...estReplicationSyncUpToolWithBulkLoadedData.java | 135 +++++----
.../hbase/replication/TestVerifyReplication.java | 128 ++++----
.../TestReplicationAdminWithClusters.java | 14 +-
.../replication/TestNamespaceReplication.java | 4 +-
.../hbase/replication/TestReplicationBase.java | 123 ++++----
.../TestReplicationChangingPeerRegionservers.java | 10 +-
.../TestReplicationDisableInactivePeer.java | 4 +-
.../replication/TestReplicationDroppedTables.java | 50 ++--
.../TestReplicationEmptyWALRecovery.java | 22 +-
.../hbase/replication/TestReplicationEndpoint.java | 95 +++---
.../replication/TestReplicationKillMasterRS.java | 2 +-
.../TestReplicationKillMasterRSCompressed.java | 2 +-
...ReplicationKillMasterRSWithSeparateOldWALs.java | 4 +-
.../hbase/replication/TestReplicationKillRS.java | 6 +-
.../replication/TestReplicationKillSlaveRS.java | 2 +-
...tReplicationKillSlaveRSWithSeparateOldWALs.java | 4 +-
.../replication/TestReplicationMetricsforUI.java | 6 +-
.../replication/TestReplicationSmallTests.java | 12 +-
.../hbase/replication/TestReplicationStatus.java | 8 +-
.../TestReplicationStatusAfterLagging.java | 10 +-
...licationStatusBothNormalAndRecoveryLagging.java | 8 +-
...ationStatusSourceStartedTargetStoppedNewOp.java | 8 +-
...ationStatusSourceStartedTargetStoppedNoOps.java | 8 +-
...atusSourceStartedTargetStoppedWithRecovery.java | 8 +-
.../replication/TestReplicationSyncUpTool.java | 324 ++++++---------------
.../replication/TestReplicationSyncUpToolBase.java | 141 +++++++++
...estReplicationEndpointWithMultipleAsyncWAL.java | 4 +-
.../TestReplicationEndpointWithMultipleWAL.java | 4 +-
...KillMasterRSCompressedWithMultipleAsyncWAL.java | 4 +-
...ationKillMasterRSCompressedWithMultipleWAL.java | 4 +-
...tReplicationSyncUpToolWithMultipleAsyncWAL.java | 12 +-
.../TestReplicationSyncUpToolWithMultipleWAL.java | 14 +-
.../replication/regionserver/TestReplicator.java | 22 +-
33 files changed, 588 insertions(+), 614 deletions(-)
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index 96010d9..278e6c5 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
+import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
+import static org.apache.hadoop.hbase.replication.TestReplicationBase.row;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
@@ -26,6 +29,7 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -36,41 +40,39 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
+import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, LargeTests.class })
-public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
+public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpToolBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
+ HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithBulkLoadedData.class);
- private static final Logger LOG = LoggerFactory
- .getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestReplicationSyncUpToolWithBulkLoadedData.class);
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
- conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
- conf1.set("hbase.replication.source.fs.conf.provider",
+ @Override
+ protected void customizeClusterConf(Configuration conf) {
+ conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+ conf.set("hbase.replication.source.fs.conf.provider",
TestSourceFSConfigurationProvider.class.getCanonicalName());
- String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ String classes = conf.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
- conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
}
-
- TestReplicationBase.setUpBeforeClass();
}
- @Override
+ @Test
public void testSyncUpTool() throws Exception {
/**
* Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
@@ -84,7 +86,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
Iterator<String> randomHFileRangeListIterator = null;
Set<String> randomHFileRanges = new HashSet<>(16);
for (int i = 0; i < 16; i++) {
- randomHFileRanges.add(utility1.getRandomUUID().toString());
+ randomHFileRanges.add(UTIL1.getRandomUUID().toString());
}
List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
Collections.sort(randomHFileRangeList);
@@ -112,58 +114,58 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
throws Exception {
LOG.debug("mimicSyncUpAfterBulkLoad");
- utility2.shutdownMiniHBaseCluster();
+ UTIL2.shutdownMiniHBaseCluster();
loadAndReplicateHFiles(false, randomHFileRangeListIterator);
- int rowCount_ht1Source = utility1.countRows(ht1Source);
+ int rowCount_ht1Source = UTIL1.countRows(ht1Source);
assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
rowCount_ht1Source);
- int rowCount_ht2Source = utility1.countRows(ht2Source);
+ int rowCount_ht2Source = UTIL1.countRows(ht2Source);
assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
rowCount_ht2Source);
- utility1.shutdownMiniHBaseCluster();
- utility2.restartHBaseCluster(1);
+ UTIL1.shutdownMiniHBaseCluster();
+ UTIL2.restartHBaseCluster(1);
Thread.sleep(SLEEP_TIME);
// Before sync up
- int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
- int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
- assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
- assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
+ int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
+ int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
+ assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
// Run sync up tool
- syncUp(utility1);
+ syncUp(UTIL1);
// After syun up
for (int i = 0; i < NB_RETRIES; i++) {
- syncUp(utility1);
- rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
- rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ syncUp(UTIL1);
+ rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
+ rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) {
- if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
+ if (rowCountHt1TargetAtPeer1 != 200 || rowCountHt2TargetAtPeer1 != 400) {
// syncUP still failed. Let's look at the source in case anything wrong there
- utility1.restartHBaseCluster(1);
- rowCount_ht1Source = utility1.countRows(ht1Source);
+ UTIL1.restartHBaseCluster(1);
+ rowCount_ht1Source = UTIL1.countRows(ht1Source);
LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
- rowCount_ht2Source = utility1.countRows(ht2Source);
+ rowCount_ht2Source = UTIL1.countRows(ht2Source);
LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
- rowCount_ht1TargetAtPeer1);
+ rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
- rowCount_ht2TargetAtPeer1);
+ rowCountHt2TargetAtPeer1);
}
- if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
+ if (rowCountHt1TargetAtPeer1 == 200 && rowCountHt2TargetAtPeer1 == 400) {
LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
break;
} else {
- LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
- + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
- + rowCount_ht2TargetAtPeer1);
+ LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i +
+ ", with rowCount_ht1TargetPeer1 =" + rowCountHt1TargetAtPeer1 +
+ " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
@@ -175,44 +177,42 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
// Load 100 + 3 hfiles to t1_syncup.
byte[][][] hfileRanges =
- new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
- Bytes.toBytes(randomHFileRangeListIterator.next()) } };
- loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
- 100);
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht1Source, hfileRanges, 100);
hfileRanges =
- new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
- Bytes.toBytes(randomHFileRangeListIterator.next()) } };
- loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht1Source,
hfileRanges, 3);
// Load 200 + 3 hfiles to t2_syncup.
hfileRanges =
- new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
- Bytes.toBytes(randomHFileRangeListIterator.next()) } };
- loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
- 200);
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, FAMILY, ht2Source, hfileRanges, 200);
hfileRanges =
- new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
- Bytes.toBytes(randomHFileRangeListIterator.next()) } };
- loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
+ new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+ Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+ loadAndValidateHFileReplication("HFileReplication_1", row, NO_REP_FAMILY, ht2Source,
hfileRanges, 3);
if (verifyReplicationOnSlave) {
// ensure replication completed
- wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
+ wait(ht1TargetAtPeer1, UTIL1.countRows(ht1Source) - 3,
"t1_syncup has 103 rows on source, and 100 on slave1");
- wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
+ wait(ht2TargetAtPeer1, UTIL1.countRows(ht2Source) - 3,
"t2_syncup has 203 rows on source, and 200 on slave1");
}
}
private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
- Path dir = utility1.getDataTestDirOnTestFS(testName);
- FileSystem fs = utility1.getTestFileSystem();
+ Path dir = UTIL1.getDataTestDirOnTestFS(testName);
+ FileSystem fs = UTIL1.getTestFileSystem();
dir = dir.makeQualified(fs);
Path familyDir = new Path(dir, Bytes.toString(fam));
@@ -220,24 +220,23 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
- HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
- + hfileIdx++), fam, row, from, to, numOfRows);
+ HFileTestUtil.createHFile(UTIL1.getConfiguration(), fs,
+ new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
}
final TableName tableName = source.getName();
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
- String[] args = { dir.toString(), tableName.toString() };
- loader.run(args);
+ BulkLoadHFiles loader = BulkLoadHFiles.create(UTIL1.getConfiguration());
+ loader.bulkLoad(tableName, dir);
}
- private void wait(Table target, int expectedCount, String msg) throws IOException,
- InterruptedException {
+ private void wait(Table target, int expectedCount, String msg)
+ throws IOException, InterruptedException {
for (int i = 0; i < NB_RETRIES; i++) {
- int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
+ int rowCountHt2TargetAtPeer1 = UTIL2.countRows(target);
if (i == NB_RETRIES - 1) {
- assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
+ assertEquals(msg, expectedCount, rowCountHt2TargetAtPeer1);
}
- if (expectedCount == rowCount_ht2TargetAtPeer1) {
+ if (expectedCount == rowCountHt2TargetAtPeer1) {
break;
}
Thread.sleep(SLEEP_TIME);
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
index 7d12cbc..7771c0a 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/replication/TestVerifyReplication.java
@@ -27,8 +27,6 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.TreeMap;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -92,7 +90,7 @@ public class TestVerifyReplication extends TestReplicationBase {
@Before
public void setUp() throws Exception {
cleanUp();
- utility2.deleteTableData(peerTableName);
+ UTIL2.deleteTableData(peerTableName);
}
@BeforeClass
@@ -103,7 +101,7 @@ public class TestVerifyReplication extends TestReplicationBase {
ColumnFamilyDescriptorBuilder.newBuilder(noRepfamName).setMaxVersions(100)
.build()).build();
- Connection connection2 = ConnectionFactory.createConnection(conf2);
+ Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(peerTable, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
@@ -112,7 +110,7 @@ public class TestVerifyReplication extends TestReplicationBase {
private void runVerifyReplication(String[] args, int expectedGoodRows, int expectedBadRows)
throws IOException, InterruptedException, ClassNotFoundException {
- Job job = new VerifyReplication().createSubmittableJob(new Configuration(conf1), args);
+ Job job = new VerifyReplication().createSubmittableJob(new Configuration(CONF1), args);
if (job == null) {
fail("Job wasn't created, see the log");
}
@@ -174,24 +172,20 @@ public class TestVerifyReplication extends TestReplicationBase {
.setMaxVersions(100).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build();
TableDescriptor table =
TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(fam).build();
- scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
- scopes.put(f.getName(), f.getScope());
- }
- Connection connection1 = ConnectionFactory.createConnection(conf1);
- Connection connection2 = ConnectionFactory.createConnection(conf2);
+ Connection connection1 = ConnectionFactory.createConnection(CONF1);
+ Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
- utility1.waitUntilAllRegionsAssigned(tableName);
- utility2.waitUntilAllRegionsAssigned(tableName);
+ UTIL1.waitUntilAllRegionsAssigned(tableName);
+ UTIL2.waitUntilAllRegionsAssigned(tableName);
- lHtable1 = utility1.getConnection().getTable(tableName);
- lHtable2 = utility2.getConnection().getTable(tableName);
+ lHtable1 = UTIL1.getConnection().getTable(tableName);
+ lHtable2 = UTIL2.getConnection().getTable(tableName);
Put put = new Put(row);
put.addColumn(familyname, row, row);
@@ -442,30 +436,30 @@ public class TestVerifyReplication extends TestReplicationBase {
runSmallBatchTest();
// Take source and target tables snapshot
- Path rootDir = FSUtils.getRootDir(conf1);
- FileSystem fs = rootDir.getFileSystem(conf1);
+ Path rootDir = FSUtils.getRootDir(CONF1);
+ FileSystem fs = rootDir.getFileSystem(CONF1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
- Path peerRootDir = FSUtils.getRootDir(conf2);
- FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+ Path peerRootDir = FSUtils.getRootDir(CONF2);
+ FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
- String temPath1 = utility1.getRandomDir().toString();
+ String temPath1 = UTIL1.getRandomDir().toString();
String temPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
- "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
- "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
- "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+ "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), "2", tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
- checkRestoreTmpDir(conf1, temPath1, 1);
- checkRestoreTmpDir(conf2, temPath2, 1);
+ checkRestoreTmpDir(CONF1, temPath1, 1);
+ checkRestoreTmpDir(CONF2, temPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan);
@@ -481,20 +475,20 @@ public class TestVerifyReplication extends TestReplicationBase {
htable2.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
- "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
- "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
- "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), "2", tableName.getNameAsString() };
+ "--sourceSnapshotTmpDir=" + temPath1, "--peerSnapshotName=" + peerSnapshotName,
+ "--peerSnapshotTmpDir=" + temPath2, "--peerFSAddress=" + peerFSAddress,
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), "2", tableName.getNameAsString() };
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
- checkRestoreTmpDir(conf1, temPath1, 2);
- checkRestoreTmpDir(conf2, temPath2, 2);
+ checkRestoreTmpDir(CONF1, temPath1, 2);
+ checkRestoreTmpDir(CONF2, temPath2, 2);
}
@Test
@@ -504,7 +498,7 @@ public class TestVerifyReplication extends TestReplicationBase {
runSmallBatchTest();
// with a quorum address (a cluster key)
- String[] args = new String[] { utility2.getClusterKey(), tableName.getNameAsString() };
+ String[] args = new String[] { UTIL2.getClusterKey(), tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
Scan scan = new Scan();
@@ -529,31 +523,31 @@ public class TestVerifyReplication extends TestReplicationBase {
runSmallBatchTest();
// Take source and target tables snapshot
- Path rootDir = FSUtils.getRootDir(conf1);
- FileSystem fs = rootDir.getFileSystem(conf1);
+ Path rootDir = FSUtils.getRootDir(CONF1);
+ FileSystem fs = rootDir.getFileSystem(CONF1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
- Path peerRootDir = FSUtils.getRootDir(conf2);
- FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+ Path peerRootDir = FSUtils.getRootDir(CONF2);
+ FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
- String tmpPath1 = utility1.getRandomDir().toString();
+ String tmpPath1 = UTIL1.getRandomDir().toString();
String tmpPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
- "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
- checkRestoreTmpDir(conf1, tmpPath1, 1);
- checkRestoreTmpDir(conf2, tmpPath2, 1);
+ checkRestoreTmpDir(CONF1, tmpPath1, 1);
+ checkRestoreTmpDir(CONF2, tmpPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable2.getScanner(scan);
@@ -569,21 +563,21 @@ public class TestVerifyReplication extends TestReplicationBase {
htable2.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(famName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), tableName,
Bytes.toString(famName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
- "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
- checkRestoreTmpDir(conf1, tmpPath1, 2);
- checkRestoreTmpDir(conf2, tmpPath2, 2);
+ checkRestoreTmpDir(CONF1, tmpPath1, 2);
+ checkRestoreTmpDir(CONF2, tmpPath2, 2);
}
private static void runBatchCopyTest() throws Exception {
@@ -621,10 +615,10 @@ public class TestVerifyReplication extends TestReplicationBase {
// with a peerTableName along with quorum address (a cluster key)
String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
- utility2.getClusterKey(), tableName.getNameAsString() };
+ UTIL2.getClusterKey(), tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
- utility2.deleteTableData(peerTableName);
+ UTIL2.deleteTableData(peerTableName);
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
}
@@ -634,32 +628,32 @@ public class TestVerifyReplication extends TestReplicationBase {
runBatchCopyTest();
// Take source and target tables snapshot
- Path rootDir = FSUtils.getRootDir(conf1);
- FileSystem fs = rootDir.getFileSystem(conf1);
+ Path rootDir = FSUtils.getRootDir(CONF1);
+ FileSystem fs = rootDir.getFileSystem(CONF1);
String sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
// Take target snapshot
- Path peerRootDir = FSUtils.getRootDir(conf2);
- FileSystem peerFs = peerRootDir.getFileSystem(conf2);
+ Path peerRootDir = FSUtils.getRootDir(CONF2);
+ FileSystem peerFs = peerRootDir.getFileSystem(CONF2);
String peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
String peerFSAddress = peerFs.getUri().toString();
- String tmpPath1 = utility1.getRandomDir().toString();
+ String tmpPath1 = UTIL1.getRandomDir().toString();
String tmpPath2 = "/tmp" + System.currentTimeMillis();
String[] args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
"--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
- "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, NB_ROWS_IN_BATCH, 0);
- checkRestoreTmpDir(conf1, tmpPath1, 1);
- checkRestoreTmpDir(conf2, tmpPath2, 1);
+ checkRestoreTmpDir(CONF1, tmpPath1, 1);
+ checkRestoreTmpDir(CONF2, tmpPath2, 1);
Scan scan = new Scan();
ResultScanner rs = htable3.getScanner(scan);
@@ -675,22 +669,22 @@ public class TestVerifyReplication extends TestReplicationBase {
htable3.delete(delete);
sourceSnapshotName = "sourceSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility1.getAdmin(), tableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL1.getAdmin(), tableName,
Bytes.toString(noRepfamName), sourceSnapshotName, rootDir, fs, true);
peerSnapshotName = "peerSnapshot-" + System.currentTimeMillis();
- SnapshotTestingUtils.createSnapshotAndValidate(utility2.getAdmin(), peerTableName,
+ SnapshotTestingUtils.createSnapshotAndValidate(UTIL2.getAdmin(), peerTableName,
Bytes.toString(noRepfamName), peerSnapshotName, peerRootDir, peerFs, true);
args = new String[] { "--peerTableName=" + peerTableName.getNameAsString(),
"--sourceSnapshotName=" + sourceSnapshotName,
"--sourceSnapshotTmpDir=" + tmpPath1, "--peerSnapshotName=" + peerSnapshotName,
"--peerSnapshotTmpDir=" + tmpPath2, "--peerFSAddress=" + peerFSAddress,
- "--peerHBaseRootAddress=" + FSUtils.getRootDir(conf2), utility2.getClusterKey(),
+ "--peerHBaseRootAddress=" + FSUtils.getRootDir(CONF2), UTIL2.getClusterKey(),
tableName.getNameAsString() };
runVerifyReplication(args, 0, NB_ROWS_IN_BATCH);
- checkRestoreTmpDir(conf1, tmpPath1, 2);
- checkRestoreTmpDir(conf2, tmpPath2, 2);
+ checkRestoreTmpDir(CONF1, tmpPath1, 2);
+ checkRestoreTmpDir(CONF2, tmpPath2, 2);
}
@AfterClass
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index 0ff757e..dc2b3a7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -74,11 +74,11 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
- connection1 = ConnectionFactory.createConnection(conf1);
- connection2 = ConnectionFactory.createConnection(conf2);
+ connection1 = ConnectionFactory.createConnection(CONF1);
+ connection2 = ConnectionFactory.createConnection(CONF2);
admin1 = connection1.getAdmin();
admin2 = connection2.getAdmin();
- adminExt = new ReplicationAdmin(conf1);
+ adminExt = new ReplicationAdmin(CONF1);
}
@AfterClass
@@ -199,8 +199,8 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
}
} finally {
- utility1.deleteTable(tn);
- utility2.deleteTable(tn);
+ UTIL1.deleteTable(tn);
+ UTIL2.deleteTable(tn);
}
}
@@ -273,7 +273,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
public void testReplicationPeerConfigUpdateCallback() throws Exception {
String peerId = "1";
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
- rpc.setClusterKey(utility2.getClusterKey());
+ rpc.setClusterKey(UTIL2.getClusterKey());
rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
rpc.getConfiguration().put("key1", "value1");
@@ -325,7 +325,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
@Override
public UUID getPeerUUID() {
- return utility1.getRandomUUID();
+ return UTIL1.getRandomUUID();
}
@Override
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
index d8a02c7..7dcdf8c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
@@ -101,8 +101,8 @@ public class TestNamespaceReplication extends TestReplicationBase {
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
- connection1 = ConnectionFactory.createConnection(conf1);
- connection2 = ConnectionFactory.createConnection(conf2);
+ connection1 = ConnectionFactory.createConnection(CONF1);
+ connection2 = ConnectionFactory.createConnection(CONF2);
admin1 = connection1.getAdmin();
admin2 = connection2.getAdmin();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index 798d0fe..e87b076 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -25,16 +25,12 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.NavigableMap;
-import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.StartMiniClusterOption;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -51,7 +47,6 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -68,22 +63,19 @@ import org.slf4j.LoggerFactory;
public class TestReplicationBase {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationBase.class);
- protected static Configuration conf1 = HBaseConfiguration.create();
- protected static Configuration conf2;
protected static Configuration CONF_WITH_LOCALFS;
- protected static ZKWatcher zkw1;
- protected static ZKWatcher zkw2;
-
protected static ReplicationAdmin admin;
protected static Admin hbaseAdmin;
protected static Table htable1;
protected static Table htable2;
- protected static NavigableMap<byte[], Integer> scopes;
- protected static HBaseTestingUtility utility1;
- protected static HBaseTestingUtility utility2;
+ protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
+ protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
+ protected static final Configuration CONF1 = UTIL1.getConfiguration();
+ protected static final Configuration CONF2 = UTIL2.getConfiguration();
+
protected static final int NUM_SLAVES1 = 2;
protected static final int NUM_SLAVES2 = 4;
protected static final int NB_ROWS_IN_BATCH = 100;
@@ -105,12 +97,12 @@ public class TestReplicationBase {
protected final void cleanUp() throws IOException, InterruptedException {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
- for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
+ for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
- utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+ UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
- int rowCount = utility1.countRows(tableName);
- utility1.deleteTableData(tableName);
+ int rowCount = UTIL1.countRows(tableName);
+ UTIL1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
// utility2 since late writes could make it to the slave in some way.
@@ -172,92 +164,85 @@ public class TestReplicationBase {
htable1.put(puts);
}
- protected static void configureClusters(){
- conf1.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/1");
+ private static void setupConfig(HBaseTestingUtility util, String znodeParent) {
+ Configuration conf = util.getConfiguration();
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
// We don't want too many edits per batch sent to the ReplicationEndpoint to trigger
// sufficient number of events. But we don't want to go too low because
// HBaseInterClusterReplicationEndpoint partitions entries into batches and we want
// more than one batch sent to the peer cluster for better testing.
- conf1.setInt("replication.source.size.capacity", 102400);
- conf1.setLong("replication.source.sleepforretries", 100);
- conf1.setInt("hbase.regionserver.maxlogs", 10);
- conf1.setLong("hbase.master.logcleaner.ttl", 10);
- conf1.setInt("zookeeper.recovery.retry", 1);
- conf1.setInt("zookeeper.recovery.retry.intervalmill", 10);
- conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
- conf1.setInt("replication.stats.thread.period.seconds", 5);
- conf1.setBoolean("hbase.tests.use.shortcircuit.reads", false);
- conf1.setLong("replication.sleep.before.failover", 2000);
- conf1.setInt("replication.source.maxretriesmultiplier", 10);
- conf1.setFloat("replication.source.ratio", 1.0f);
- conf1.setBoolean("replication.source.eof.autorecovery", true);
- conf1.setLong("hbase.serial.replication.waiting.ms", 100);
-
- utility1 = new HBaseTestingUtility(conf1);
-
- // Base conf2 on conf1 so it gets the right zk cluster.
- conf2 = HBaseConfiguration.create(conf1);
+ conf.setInt("replication.source.size.capacity", 102400);
+ conf.setLong("replication.source.sleepforretries", 100);
+ conf.setInt("hbase.regionserver.maxlogs", 10);
+ conf.setLong("hbase.master.logcleaner.ttl", 10);
+ conf.setInt("zookeeper.recovery.retry", 1);
+ conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf.setInt("replication.stats.thread.period.seconds", 5);
+ conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ conf.setLong("replication.sleep.before.failover", 2000);
+ conf.setInt("replication.source.maxretriesmultiplier", 10);
+ conf.setFloat("replication.source.ratio", 1.0f);
+ conf.setBoolean("replication.source.eof.autorecovery", true);
+ conf.setLong("hbase.serial.replication.waiting.ms", 100);
+ }
+
+ static void configureClusters(HBaseTestingUtility util1,
+ HBaseTestingUtility util2) {
+ setupConfig(util1, "/1");
+ setupConfig(util2, "/2");
+
+ Configuration conf2 = util2.getConfiguration();
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
conf2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 6);
conf2.setBoolean("hbase.tests.use.shortcircuit.reads", false);
-
- utility2 = new HBaseTestingUtility(conf2);
}
protected static void restartHBaseCluster(HBaseTestingUtility util, int numSlaves)
throws Exception {
util.shutdownMiniHBaseCluster();
- util
- .startMiniHBaseCluster(StartMiniClusterOption.builder().numRegionServers(numSlaves).build());
+ util.restartHBaseCluster(numSlaves);
}
- protected static void startClusters() throws Exception{
- utility1.startMiniZKCluster();
- MiniZooKeeperCluster miniZK = utility1.getZkCluster();
- // Have to reget conf1 in case zk cluster location different
- // than default
- conf1 = utility1.getConfiguration();
- zkw1 = new ZKWatcher(conf1, "cluster1", null, true);
- admin = new ReplicationAdmin(conf1);
+ protected static void startClusters() throws Exception {
+ UTIL1.startMiniZKCluster();
+ MiniZooKeeperCluster miniZK = UTIL1.getZkCluster();
+ admin = new ReplicationAdmin(CONF1);
LOG.info("Setup first Zk");
- utility2.setZkCluster(miniZK);
- zkw2 = new ZKWatcher(conf2, "cluster2", null, true);
+ UTIL2.setZkCluster(miniZK);
LOG.info("Setup second Zk");
- CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
- utility1.startMiniCluster(NUM_SLAVES1);
+ CONF_WITH_LOCALFS = HBaseConfiguration.create(CONF1);
+ UTIL1.startMiniCluster(NUM_SLAVES1);
// Have a bunch of slave servers, because inter-cluster shipping logic uses number of sinks
// as a component in deciding maximum number of parallel batches to send to the peer cluster.
- utility2.startMiniCluster(NUM_SLAVES2);
+ UTIL2.startMiniCluster(NUM_SLAVES2);
- hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
+ hbaseAdmin = ConnectionFactory.createConnection(CONF1).getAdmin();
TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
.setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
- scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (ColumnFamilyDescriptor f : table.getColumnFamilies()) {
- scopes.put(f.getName(), f.getScope());
- }
- Connection connection1 = ConnectionFactory.createConnection(conf1);
- Connection connection2 = ConnectionFactory.createConnection(conf2);
+
+ Connection connection1 = ConnectionFactory.createConnection(CONF1);
+ Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
}
- utility1.waitUntilAllRegionsAssigned(tableName);
- utility2.waitUntilAllRegionsAssigned(tableName);
+ UTIL1.waitUntilAllRegionsAssigned(tableName);
+ UTIL2.waitUntilAllRegionsAssigned(tableName);
htable1 = connection1.getTable(tableName);
htable2 = connection2.getTable(tableName);
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- configureClusters();
+ configureClusters(UTIL1, UTIL2);
startClusters();
}
@@ -269,7 +254,7 @@ public class TestReplicationBase {
public void setUpBase() throws Exception {
if (!peerExist(PEER_ID2)) {
ReplicationPeerConfig rpc = ReplicationPeerConfig.newBuilder()
- .setClusterKey(utility2.getClusterKey()).setSerial(isSerialPeer()).build();
+ .setClusterKey(UTIL2.getClusterKey()).setSerial(isSerialPeer()).build();
hbaseAdmin.addReplicationPeer(PEER_ID2, rpc);
}
}
@@ -285,7 +270,7 @@ public class TestReplicationBase {
Put put = new Put(row);
put.addColumn(famName, row, row);
- htable1 = utility1.getConnection().getTable(tableName);
+ htable1 = UTIL1.getConnection().getTable(tableName);
htable1.put(put);
Get get = new Get(row);
@@ -340,7 +325,7 @@ public class TestReplicationBase {
htable2.close();
htable1.close();
admin.close();
- utility2.shutdownMiniCluster();
- utility1.shutdownMiniCluster();
+ UTIL2.shutdownMiniCluster();
+ UTIL1.shutdownMiniCluster();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
index b94b443..a7d983c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
@@ -82,11 +82,11 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
public void setUp() throws Exception {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
- for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
+ for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
- utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+ UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
- utility1.deleteTableData(tableName);
+ UTIL1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
// utility2 since late writes could make it to the slave in some way.
@@ -117,7 +117,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
@Test
public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
LOG.info("testSimplePutDelete");
- MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
+ MiniHBaseCluster peerCluster = UTIL2.getMiniHBaseCluster();
int numRS = peerCluster.getRegionServerThreads().size();
doPutTest(Bytes.toBytes(1));
@@ -144,7 +144,7 @@ public class TestReplicationChangingPeerRegionservers extends TestReplicationBas
put.addColumn(famName, row, row);
if (htable1 == null) {
- htable1 = utility1.getConnection().getTable(tableName);
+ htable1 = UTIL1.getConnection().getTable(tableName);
}
htable1.put(put);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
index dbb5164..4c034b1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDisableInactivePeer.java
@@ -54,7 +54,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
*/
@Test
public void testDisableInactivePeer() throws Exception {
- utility2.shutdownMiniHBaseCluster();
+ UTIL2.shutdownMiniHBaseCluster();
byte[] rowkey = Bytes.toBytes("disable inactive peer");
Put put = new Put(rowkey);
@@ -67,7 +67,7 @@ public class TestReplicationDisableInactivePeer extends TestReplicationBase {
// disable and start the peer
admin.disablePeer("2");
StartMiniClusterOption option = StartMiniClusterOption.builder().numRegionServers(2).build();
- utility2.startMiniHBaseCluster(option);
+ UTIL2.startMiniHBaseCluster(option);
Get get = new Get(rowkey);
for (int i = 0; i < NB_RETRIES; i++) {
Result res = htable2.get(get);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
index 31750a8..1d391d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationDroppedTables.java
@@ -63,14 +63,14 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
public void setUpBase() throws Exception {
// Starting and stopping replication can make us miss new logs,
// rolling like this makes sure the most recent one gets added to the queue
- for (JVMClusterUtil.RegionServerThread r : utility1.getHBaseCluster()
+ for (JVMClusterUtil.RegionServerThread r : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
- utility1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
+ UTIL1.getAdmin().rollWALWriter(r.getRegionServer().getServerName());
}
// Initialize the peer after wal rolling, so that we will abandon the stuck WALs.
super.setUpBase();
- int rowCount = utility1.countRows(tableName);
- utility1.deleteTableData(tableName);
+ int rowCount = UTIL1.countRows(tableName);
+ UTIL1.deleteTableData(tableName);
// truncating the table will send one Delete per row to the slave cluster
// in an async fashion, which is why we cannot just call deleteTableData on
// utility2 since late writes could make it to the slave in some way.
@@ -101,7 +101,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
// when replicate at sink side, it'll apply to rs group by table name, so the WAL of test table
// may apply first, and then test_dropped table, and we will believe that the replication is not
// got stuck (HBASE-20475).
- conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
+ CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 10 * 1024);
}
@Test
@@ -121,11 +121,11 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
@Test
public void testEditsDroppedWithDroppedTableNS() throws Exception {
// also try with a namespace
- Connection connection1 = ConnectionFactory.createConnection(conf1);
+ Connection connection1 = ConnectionFactory.createConnection(CONF1);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createNamespace(NamespaceDescriptor.create("NS").build());
}
- Connection connection2 = ConnectionFactory.createConnection(conf2);
+ Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin2 = connection2.getAdmin()) {
admin2.createNamespace(NamespaceDescriptor.create("NS").build());
}
@@ -143,13 +143,13 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
}
private void testEditsBehindDroppedTable(boolean allowProceeding, String tName) throws Exception {
- conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
- conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
+ CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, allowProceeding);
+ CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
// make sure we have a single region server only, so that all
// edits for all tables go there
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
+ UTIL1.shutdownMiniHBaseCluster();
+ UTIL1.startMiniHBaseCluster();
TableName tablename = TableName.valueOf(tName);
byte[] familyName = Bytes.toBytes("fam");
@@ -161,16 +161,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
.newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
- Connection connection1 = ConnectionFactory.createConnection(conf1);
- Connection connection2 = ConnectionFactory.createConnection(conf2);
+ Connection connection1 = ConnectionFactory.createConnection(CONF1);
+ Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table);
}
- utility1.waitUntilAllRegionsAssigned(tablename);
- utility2.waitUntilAllRegionsAssigned(tablename);
+ UTIL1.waitUntilAllRegionsAssigned(tablename);
+ UTIL2.waitUntilAllRegionsAssigned(tablename);
// now suspend replication
try (Admin admin1 = connection1.getAdmin()) {
@@ -213,18 +213,18 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
verifyReplicationStuck();
}
// just to be safe
- conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+ CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
}
@Test
public void testEditsBehindDroppedTableTiming() throws Exception {
- conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
- conf1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
+ CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, true);
+ CONF1.setInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY, 1);
// make sure we have a single region server only, so that all
// edits for all tables go there
- utility1.shutdownMiniHBaseCluster();
- utility1.startMiniHBaseCluster();
+ UTIL1.shutdownMiniHBaseCluster();
+ UTIL1.startMiniHBaseCluster();
TableName tablename = TableName.valueOf("testdroppedtimed");
byte[] familyName = Bytes.toBytes("fam");
@@ -236,16 +236,16 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
.newBuilder(familyName).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build();
- Connection connection1 = ConnectionFactory.createConnection(conf1);
- Connection connection2 = ConnectionFactory.createConnection(conf2);
+ Connection connection1 = ConnectionFactory.createConnection(CONF1);
+ Connection connection2 = ConnectionFactory.createConnection(CONF2);
try (Admin admin1 = connection1.getAdmin()) {
admin1.createTable(table);
}
try (Admin admin2 = connection2.getAdmin()) {
admin2.createTable(table);
}
- utility1.waitUntilAllRegionsAssigned(tablename);
- utility2.waitUntilAllRegionsAssigned(tablename);
+ UTIL1.waitUntilAllRegionsAssigned(tablename);
+ UTIL2.waitUntilAllRegionsAssigned(tablename);
// now suspend replication
try (Admin admin1 = connection1.getAdmin()) {
@@ -290,7 +290,7 @@ public class TestReplicationDroppedTables extends TestReplicationBase {
verifyReplicationProceeded();
}
// just to be safe
- conf1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
+ CONF1.setBoolean(HConstants.REPLICATION_DROP_ON_DELETED_TABLE_KEY, false);
}
private boolean peerHasAllNormalRows() throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
index 4effe41..c0f22a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEmptyWALRecovery.java
@@ -55,16 +55,16 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
* @param numRs number of regionservers
*/
private void waitForLogAdvance(int numRs) throws Exception {
- Waiter.waitFor(conf1, 10000, new Waiter.Predicate<Exception>() {
+ Waiter.waitFor(CONF1, 10000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (int i = 0; i < numRs; i++) {
- HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
+ HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
RegionInfo regionInfo =
- utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
Path currentFile = ((AbstractFSWAL<?>) wal).getCurrentFileName();
- Replication replicationService = (Replication) utility1.getHBaseCluster()
+ Replication replicationService = (Replication) UTIL1.getHBaseCluster()
.getRegionServer(i).getReplicationSourceService();
for (ReplicationSourceInterface rsi : replicationService.getReplicationManager()
.getSources()) {
@@ -81,19 +81,19 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
@Test
public void testEmptyWALRecovery() throws Exception {
- final int numRs = utility1.getHBaseCluster().getRegionServerThreads().size();
+ final int numRs = UTIL1.getHBaseCluster().getRegionServerThreads().size();
// for each RS, create an empty wal with same walGroupId
final List<Path> emptyWalPaths = new ArrayList<>();
long ts = System.currentTimeMillis();
for (int i = 0; i < numRs; i++) {
RegionInfo regionInfo =
- utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
- WAL wal = utility1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
+ UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ WAL wal = UTIL1.getHBaseCluster().getRegionServer(i).getWAL(regionInfo);
Path currentWalPath = AbstractFSWALProvider.getCurrentFileName(wal);
String walGroupId = AbstractFSWALProvider.getWALPrefixFromWALName(currentWalPath.getName());
- Path emptyWalPath = new Path(utility1.getDataTestDir(), walGroupId + "." + ts);
- utility1.getTestFileSystem().create(emptyWalPath).close();
+ Path emptyWalPath = new Path(UTIL1.getDataTestDir(), walGroupId + "." + ts);
+ UTIL1.getTestFileSystem().create(emptyWalPath).close();
emptyWalPaths.add(emptyWalPath);
}
@@ -102,12 +102,12 @@ public class TestReplicationEmptyWALRecovery extends TestReplicationBase {
// determine if the file being replicated currently is still opened for write, so just inject a
// new wal to the replication queue does not mean the previous file is closed.
for (int i = 0; i < numRs; i++) {
- HRegionServer hrs = utility1.getHBaseCluster().getRegionServer(i);
+ HRegionServer hrs = UTIL1.getHBaseCluster().getRegionServer(i);
Replication replicationService = (Replication) hrs.getReplicationSourceService();
replicationService.getReplicationManager().preLogRoll(emptyWalPaths.get(i));
replicationService.getReplicationManager().postLogRoll(emptyWalPaths.get(i));
RegionInfo regionInfo =
- utility1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
+ UTIL1.getHBaseCluster().getRegions(htable1.getName()).get(0).getRegionInfo();
WAL wal = hrs.getWAL(regionInfo);
wal.rollWriter(true);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 03fbb59..b909d8f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -83,7 +83,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestReplicationBase.setUpBeforeClass();
- numRegionServers = utility1.getHBaseCluster().getRegionServerThreads().size();
+ numRegionServers = UTIL1.getHBaseCluster().getRegionServerThreads().size();
}
@AfterClass
@@ -101,12 +101,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
ReplicationEndpointReturningFalse.replicated.set(false);
ReplicationEndpointForTest.lastEntries = null;
final List<RegionServerThread> rsThreads =
- utility1.getMiniHBaseCluster().getRegionServerThreads();
+ UTIL1.getMiniHBaseCluster().getRegionServerThreads();
for (RegionServerThread rs : rsThreads) {
- utility1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
+ UTIL1.getAdmin().rollWALWriter(rs.getRegionServer().getServerName());
}
// Wait for all log roll to finish
- utility1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
+ UTIL1.waitFor(3000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
for (RegionServerThread rs : rsThreads) {
@@ -134,18 +134,18 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testCustomReplicationEndpoint() throws Exception {
// test installing a custom replication endpoint other than the default one.
admin.addPeer("testCustomReplicationEndpoint",
- new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
// check whether the class has been constructed and started
- Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.contructedCount.get() >= numRegionServers;
}
});
- Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.startedCount.get() >= numRegionServers;
@@ -157,7 +157,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
// now replicate some data.
doPut(Bytes.toBytes("row42"));
- Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
@@ -176,7 +176,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
int peerCount = admin.getPeersCount();
final String id = "testReplicationEndpointReturnsFalseOnReplicate";
admin.addPeer(id,
- new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointReturningFalse.class.getName()), null);
// This test is flakey and then there is so much stuff flying around in here its, hard to
// debug. Peer needs to be up for the edit to make it across. This wait on
@@ -188,7 +188,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
// now replicate some data
doPut(row);
- Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
// Looks like replication endpoint returns false unless we put more than 10 edits. We
@@ -209,7 +209,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
public void testInterClusterReplication() throws Exception {
final String id = "testInterClusterReplication";
- List<HRegion> regions = utility1.getHBaseCluster().getRegions(tableName);
+ List<HRegion> regions = UTIL1.getHBaseCluster().getRegions(tableName);
int totEdits = 0;
// Make sure edits are spread across regions because we do region based batching
@@ -228,12 +228,12 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
admin.addPeer(id,
- new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf2))
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF2))
.setReplicationEndpointImpl(InterClusterReplicationEndpointForTest.class.getName()),
null);
final int numEdits = totEdits;
- Waiter.waitFor(conf1, 30000, new Waiter.ExplainingPredicate<Exception>() {
+ Waiter.waitFor(CONF1, 30000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return InterClusterReplicationEndpointForTest.replicateCount.get() == numEdits;
@@ -248,26 +248,27 @@ public class TestReplicationEndpoint extends TestReplicationBase {
});
admin.removePeer("testInterClusterReplication");
- utility1.deleteTableData(tableName);
+ UTIL1.deleteTableData(tableName);
}
@Test
public void testWALEntryFilterFromReplicationEndpoint() throws Exception {
- ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ ReplicationPeerConfig rpc =
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
- //test that we can create mutliple WALFilters reflectively
+ // test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
- EverythingPassesWALEntryFilter.class.getName() +
- "," + EverythingPassesWALEntryFilterSubclass.class.getName());
+ EverythingPassesWALEntryFilter.class.getName() + "," +
+ EverythingPassesWALEntryFilterSubclass.class.getName());
admin.addPeer("testWALEntryFilterFromReplicationEndpoint", rpc);
// now replicate some data.
- try (Connection connection = ConnectionFactory.createConnection(conf1)) {
+ try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
doPut(connection, Bytes.toBytes("row1"));
doPut(connection, row);
doPut(connection, Bytes.toBytes("row2"));
}
- Waiter.waitFor(conf1, 60000, new Waiter.Predicate<Exception>() {
+ Waiter.waitFor(CONF1, 60000, new Waiter.Predicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return ReplicationEndpointForTest.replicateCount.get() >= 1;
@@ -280,37 +281,38 @@ public class TestReplicationEndpoint extends TestReplicationBase {
admin.removePeer("testWALEntryFilterFromReplicationEndpoint");
}
- @Test (expected=IOException.class)
+ @Test(expected = IOException.class)
public void testWALEntryFilterAddValidation() throws Exception {
- ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ ReplicationPeerConfig rpc =
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
- //test that we can create mutliple WALFilters reflectively
+ // test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
- "IAmNotARealWalEntryFilter");
+ "IAmNotARealWalEntryFilter");
admin.addPeer("testWALEntryFilterAddValidation", rpc);
}
- @Test (expected=IOException.class)
+ @Test(expected = IOException.class)
public void testWALEntryFilterUpdateValidation() throws Exception {
- ReplicationPeerConfig rpc = new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(conf1))
+ ReplicationPeerConfig rpc =
+ new ReplicationPeerConfig().setClusterKey(ZKConfig.getZooKeeperClusterKey(CONF1))
.setReplicationEndpointImpl(ReplicationEndpointWithWALEntryFilter.class.getName());
- //test that we can create mutliple WALFilters reflectively
+ // test that we can create mutliple WALFilters reflectively
rpc.getConfiguration().put(BaseReplicationEndpoint.REPLICATION_WALENTRYFILTER_CONFIG_KEY,
- "IAmNotARealWalEntryFilter");
+ "IAmNotARealWalEntryFilter");
admin.updatePeerConfig("testWALEntryFilterUpdateValidation", rpc);
}
-
@Test
- public void testMetricsSourceBaseSourcePassthrough(){
+ public void testMetricsSourceBaseSourcePassthrough() {
/*
- The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl
- and a MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces.
- Both of those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which
- allows for custom JMX metrics.
- This test checks to make sure the BaseSource decorator logic on MetricsSource actually calls down through
- the two layers of wrapping to the actual BaseSource.
- */
+ * The replication MetricsSource wraps a MetricsReplicationSourceSourceImpl and a
+ * MetricsReplicationGlobalSourceSource, so that metrics get written to both namespaces. Both of
+ * those classes wrap a MetricsReplicationSourceImpl that implements BaseSource, which allows
+ * for custom JMX metrics. This test checks to make sure the BaseSource decorator logic on
+ * MetricsSource actually calls down through the two layers of wrapping to the actual
+ * BaseSource.
+ */
String id = "id";
DynamicMetricsRegistry mockRegistry = mock(DynamicMetricsRegistry.class);
MetricsReplicationSourceImpl singleRms = mock(MetricsReplicationSourceImpl.class);
@@ -318,15 +320,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
MetricsReplicationSourceImpl globalRms = mock(MetricsReplicationSourceImpl.class);
when(globalRms.getMetricsRegistry()).thenReturn(mockRegistry);
-
- MetricsReplicationSourceSource singleSourceSource = new MetricsReplicationSourceSourceImpl(singleRms, id);
- MetricsReplicationSourceSource globalSourceSource = new MetricsReplicationGlobalSourceSource(globalRms);
+ MetricsReplicationSourceSource singleSourceSource =
+ new MetricsReplicationSourceSourceImpl(singleRms, id);
+ MetricsReplicationSourceSource globalSourceSource =
+ new MetricsReplicationGlobalSourceSource(globalRms);
MetricsReplicationSourceSource spyglobalSourceSource = spy(globalSourceSource);
doNothing().when(spyglobalSourceSource).incrFailedRecoveryQueue();
Map<String, MetricsReplicationSourceSource> singleSourceSourceByTable = new HashMap<>();
- MetricsSource source = new MetricsSource(id, singleSourceSource, spyglobalSourceSource,
- singleSourceSourceByTable);
+ MetricsSource source =
+ new MetricsSource(id, singleSourceSource, spyglobalSourceSource, singleSourceSourceByTable);
String gaugeName = "gauge";
@@ -388,7 +391,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
private void doPut(byte[] row) throws IOException {
- try (Connection connection = ConnectionFactory.createConnection(conf1)) {
+ try (Connection connection = ConnectionFactory.createConnection(CONF1)) {
doPut(connection, row);
}
}
@@ -413,7 +416,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
public static class ReplicationEndpointForTest extends BaseReplicationEndpoint {
- static UUID uuid = utility1.getRandomUUID();
+ static UUID uuid = UTIL1.getRandomUUID();
static AtomicInteger contructedCount = new AtomicInteger();
static AtomicInteger startedCount = new AtomicInteger();
static AtomicInteger stoppedCount = new AtomicInteger();
@@ -562,7 +565,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
}
- public static class EverythingPassesWALEntryFilterSubclass extends EverythingPassesWALEntryFilter {
-
+ public static class EverythingPassesWALEntryFilterSubclass
+ extends EverythingPassesWALEntryFilter {
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java
index 41cc9bc..ae99eb8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRS.java
@@ -37,6 +37,6 @@ public class TestReplicationKillMasterRS extends TestReplicationKillRS {
@Test
public void killOneMasterRS() throws Exception {
- loadTableAndKillRS(utility1);
+ loadTableAndKillRS(UTIL1);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java
index 6cbae83..e649149 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSCompressed.java
@@ -41,7 +41,7 @@ public class TestReplicationKillMasterRSCompressed extends TestReplicationKillMa
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
+ CONF1.setBoolean(HConstants.ENABLE_WAL_COMPRESSION, true);
TestReplicationBase.setUpBeforeClass();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java
index 108f274..aa3aadd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillMasterRSWithSeparateOldWALs.java
@@ -36,12 +36,12 @@ public class TestReplicationKillMasterRSWithSeparateOldWALs extends TestReplicat
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
+ CONF1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
TestReplicationBase.setUpBeforeClass();
}
@Test
public void killOneMasterRS() throws Exception {
- loadTableAndKillRS(utility1);
+ loadTableAndKillRS(UTIL1);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java
index 5b4fa2a..c245726 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillRS.java
@@ -57,10 +57,10 @@ public class TestReplicationKillRS extends TestReplicationBase {
Thread killer = killARegionServer(util, 5000, rsToKill1);
Result[] res;
int initialCount;
- try (Connection conn = ConnectionFactory.createConnection(conf1)) {
+ try (Connection conn = ConnectionFactory.createConnection(CONF1)) {
try (Table table = conn.getTable(tableName)) {
LOG.info("Start loading table");
- initialCount = utility1.loadTable(table, famName);
+ initialCount = UTIL1.loadTable(table, famName);
LOG.info("Done loading table");
killer.join(5000);
LOG.info("Done waiting for threads");
@@ -86,7 +86,7 @@ public class TestReplicationKillRS extends TestReplicationBase {
int lastCount = 0;
final long start = System.currentTimeMillis();
int i = 0;
- try (Connection conn = ConnectionFactory.createConnection(conf2)) {
+ try (Connection conn = ConnectionFactory.createConnection(CONF2)) {
try (Table table = conn.getTable(tableName)) {
while (true) {
if (i == NB_RETRIES - 1) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java
index 96630b2..733fa3a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRS.java
@@ -37,6 +37,6 @@ public class TestReplicationKillSlaveRS extends TestReplicationKillRS {
@Test
public void killOneSlaveRS() throws Exception {
- loadTableAndKillRS(utility2);
+ loadTableAndKillRS(UTIL2);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java
index a852b81..abff3e2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationKillSlaveRSWithSeparateOldWALs.java
@@ -36,12 +36,12 @@ public class TestReplicationKillSlaveRSWithSeparateOldWALs extends TestReplicati
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
+ CONF1.setBoolean(AbstractFSWALProvider.SEPARATE_OLDLOGDIR, true);
TestReplicationBase.setUpBeforeClass();
}
@Test
public void killOneSlaveRS() throws Exception {
- loadTableAndKillRS(utility2);
+ loadTableAndKillRS(UTIL2);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
index 8ff4d84..c646a90 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationMetricsforUI.java
@@ -42,7 +42,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
@Test
public void testReplicationMetrics() throws Exception {
- try (Admin hbaseAdmin = utility1.getConnection().getAdmin()) {
+ try (Admin hbaseAdmin = UTIL1.getConnection().getAdmin()) {
Put p = new Put(Bytes.toBytes("starter"));
p.addColumn(famName, qualName, Bytes.toBytes("value help to test replication delay"));
htable1.put(p);
@@ -52,7 +52,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
}
// sleep 5 seconds to make sure timePassedAfterLastShippedOp > 2 * ageOfLastShippedOp
Thread.sleep(5000);
- HRegionServer rs = utility1.getRSForFirstRegionInTable(tableName);
+ HRegionServer rs = UTIL1.getRSForFirstRegionInTable(tableName);
Map<String, ReplicationStatus> metrics = rs.getWalGroupsReplicationStatus();
Assert.assertEquals("metric size ", 1, metrics.size());
long lastPosition = 0;
@@ -72,7 +72,7 @@ public class TestReplicationMetricsforUI extends TestReplicationBase {
.size() == 0) {
Thread.sleep(500);
}
- rs = utility1.getRSForFirstRegionInTable(tableName);
+ rs = UTIL1.getRSForFirstRegionInTable(tableName);
metrics = rs.getWalGroupsReplicationStatus();
Path lastPath = null;
for (Map.Entry<String, ReplicationStatus> metric : metrics.entrySet()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 2c8dc4c..b8b9678 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -102,7 +102,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
final byte[] v1 = Bytes.toBytes("v1");
final byte[] v2 = Bytes.toBytes("v2");
final byte[] v3 = Bytes.toBytes("v3");
- htable1 = utility1.getConnection().getTable(tableName);
+ htable1 = UTIL1.getConnection().getTable(tableName);
long t = EnvironmentEdgeManager.currentTime();
// create three versions for "row"
@@ -265,7 +265,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
}
}
ReplicationPeerConfig rpc =
- ReplicationPeerConfig.newBuilder().setClusterKey(utility2.getClusterKey()).build();
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
hbaseAdmin.addReplicationPeer(PEER_ID, rpc);
Thread.sleep(SLEEP_TIME);
rowKey = Bytes.toBytes("do rep");
@@ -363,7 +363,7 @@ public class TestReplicationSmallTests extends TestReplicationBase {
final String colFam = "cf1";
final int numOfTables = 3;
- Admin hadmin = utility1.getAdmin();
+ Admin hadmin = UTIL1.getAdmin();
// Create Tables
for (int i = 0; i < numOfTables; i++) {
@@ -408,15 +408,15 @@ public class TestReplicationSmallTests extends TestReplicationBase {
public void testReplicationInReplay() throws Exception {
final TableName tableName = htable1.getName();
- HRegion region = utility1.getMiniHBaseCluster().getRegions(tableName).get(0);
+ HRegion region = UTIL1.getMiniHBaseCluster().getRegions(tableName).get(0);
RegionInfo hri = region.getRegionInfo();
NavigableMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (byte[] fam : htable1.getDescriptor().getColumnFamilyNames()) {
scopes.put(fam, 1);
}
final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
- int index = utility1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
- WAL wal = utility1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
+ int index = UTIL1.getMiniHBaseCluster().getServerWith(hri.getRegionName());
+ WAL wal = UTIL1.getMiniHBaseCluster().getRegionServer(index).getWAL(region.getRegionInfo());
final byte[] rowName = Bytes.toBytes("testReplicationInReplay");
final byte[] qualifier = Bytes.toBytes("q");
final byte[] value = Bytes.toBytes("v");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
index a305b66..7eddc5c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatus.java
@@ -54,7 +54,7 @@ public class TestReplicationStatus extends TestReplicationBase {
*/
@Test
public void testReplicationStatus() throws Exception {
- Admin hbaseAdmin = utility1.getAdmin();
+ Admin hbaseAdmin = UTIL1.getAdmin();
// disable peer
hbaseAdmin.disableReplicationPeer(PEER_ID2);
@@ -69,7 +69,7 @@ public class TestReplicationStatus extends TestReplicationBase {
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
- for (JVMClusterUtil.RegionServerThread thread : utility1.getHBaseCluster()
+ for (JVMClusterUtil.RegionServerThread thread : UTIL1.getHBaseCluster()
.getRegionServerThreads()) {
ServerName server = thread.getRegionServer().getServerName();
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
@@ -88,10 +88,10 @@ public class TestReplicationStatus extends TestReplicationBase {
}
// Stop rs1, then the queue of rs1 will be transfered to rs0
- utility1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
+ UTIL1.getHBaseCluster().getRegionServer(1).stop("Stop RegionServer");
Thread.sleep(10000);
metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
- ServerName server = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ ServerName server = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
ServerMetrics sm = metrics.getLiveServerMetrics().get(server);
List<ReplicationLoadSource> rLoadSourceList = sm.getReplicationLoadSourceList();
// check SourceList still only has one entry
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
index 0ec98e9..edeaf9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusAfterLagging.java
@@ -44,18 +44,18 @@ public class TestReplicationStatusAfterLagging extends TestReplicationBase {
@Test
public void testReplicationStatusAfterLagging() throws Exception {
- utility2.shutdownMiniHBaseCluster();
- restartHBaseCluster(utility1, 1);
+ UTIL2.shutdownMiniHBaseCluster();
+ restartHBaseCluster(UTIL1, 1);
// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(famName, Bytes.toBytes("col1"), Bytes.toBytes("val" + i));
htable1.put(p);
}
- utility2.startMiniHBaseCluster();
+ UTIL2.startMiniHBaseCluster();
Thread.sleep(10000);
- Admin hbaseAdmin = utility1.getAdmin();
- ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ Admin hbaseAdmin = UTIL1.getAdmin();
+ ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
List<ReplicationLoadSource> loadSources =
metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
index 75255bb..16d3822 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusBothNormalAndRecoveryLagging.java
@@ -44,7 +44,7 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli
@Test
public void testReplicationStatusBothNormalAndRecoveryLagging() throws Exception {
- utility2.shutdownMiniHBaseCluster();
+ UTIL2.shutdownMiniHBaseCluster();
// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@@ -52,9 +52,9 @@ public class TestReplicationStatusBothNormalAndRecoveryLagging extends TestRepli
htable1.put(p);
}
Thread.sleep(10000);
- restartHBaseCluster(utility1, 1);
- Admin hbaseAdmin = utility1.getAdmin();
- ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ restartHBaseCluster(UTIL1, 1);
+ Admin hbaseAdmin = UTIL1.getAdmin();
+ ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000);
// add more values to cluster 1, these should cause normal queue to lag
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
index a8f266a..6deb095 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNewOp.java
@@ -45,9 +45,9 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe
@Test
public void testReplicationStatusSourceStartedTargetStoppedNewOp() throws Exception {
- utility2.shutdownMiniHBaseCluster();
- restartHBaseCluster(utility1, 1);
- Admin hbaseAdmin = utility1.getAdmin();
+ UTIL2.shutdownMiniHBaseCluster();
+ restartHBaseCluster(UTIL1, 1);
+ Admin hbaseAdmin = UTIL1.getAdmin();
// add some values to source cluster
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@@ -55,7 +55,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedNewOp extends TestRe
htable1.put(p);
}
Thread.sleep(10000);
- ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
List<ReplicationLoadSource> loadSources =
metrics.getLiveServerMetrics().get(serverName).getReplicationLoadSourceList();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
index 6625eb5..01f49f4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedNoOps.java
@@ -42,10 +42,10 @@ public class TestReplicationStatusSourceStartedTargetStoppedNoOps extends TestRe
@Test
public void testReplicationStatusSourceStartedTargetStoppedNoOps() throws Exception {
- utility2.shutdownMiniHBaseCluster();
- restartHBaseCluster(utility1, 1);
- Admin hbaseAdmin = utility1.getAdmin();
- ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ UTIL2.shutdownMiniHBaseCluster();
+ restartHBaseCluster(UTIL1, 1);
+ Admin hbaseAdmin = UTIL1.getAdmin();
+ ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000);
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
List<ReplicationLoadSource> loadSources =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
index 0b5f6e8..fde87bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStatusSourceStartedTargetStoppedWithRecovery.java
@@ -46,7 +46,7 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery
@Test
public void testReplicationStatusSourceStartedTargetStoppedWithRecovery() throws Exception {
- utility2.shutdownMiniHBaseCluster();
+ UTIL2.shutdownMiniHBaseCluster();
// add some values to cluster 1
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
Put p = new Put(Bytes.toBytes("row" + i));
@@ -54,9 +54,9 @@ public class TestReplicationStatusSourceStartedTargetStoppedWithRecovery
htable1.put(p);
}
Thread.sleep(10000);
- restartHBaseCluster(utility1, 1);
- Admin hbaseAdmin = utility1.getAdmin();
- ServerName serverName = utility1.getHBaseCluster().getRegionServer(0).getServerName();
+ restartHBaseCluster(UTIL1, 1);
+ Admin hbaseAdmin = UTIL1.getAdmin();
+ ServerName serverName = UTIL1.getHBaseCluster().getRegionServer(0).getServerName();
Thread.sleep(10000);
ClusterMetrics metrics = hbaseAdmin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS));
List<ReplicationLoadSource> loadSources =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 745c439..c764966 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -17,30 +17,19 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_RETRIES;
+import static org.apache.hadoop.hbase.replication.TestReplicationBase.NB_ROWS_IN_BATCH;
+import static org.apache.hadoop.hbase.replication.TestReplicationBase.SLEEP_TIME;
import static org.junit.Assert.assertEquals;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
-import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.util.ToolRunner;
-import org.junit.After;
-import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -48,183 +37,56 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ ReplicationTests.class, LargeTests.class })
-public class TestReplicationSyncUpTool extends TestReplicationBase {
+public class TestReplicationSyncUpTool extends TestReplicationSyncUpToolBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
+ HBaseClassTestRule.forClass(TestReplicationSyncUpTool.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSyncUpTool.class);
- private static final TableName t1_su = TableName.valueOf("t1_syncup");
- private static final TableName t2_su = TableName.valueOf("t2_syncup");
-
- protected static final byte[] famName = Bytes.toBytes("cf1");
- private static final byte[] qualName = Bytes.toBytes("q1");
-
- protected static final byte[] noRepfamName = Bytes.toBytes("norep");
-
- private HTableDescriptor t1_syncupSource, t1_syncupTarget;
- private HTableDescriptor t2_syncupSource, t2_syncupTarget;
-
- protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
-
- @Before
- public void setUp() throws Exception {
- HColumnDescriptor fam;
-
- t1_syncupSource = new HTableDescriptor(t1_su);
- fam = new HColumnDescriptor(famName);
- fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
- t1_syncupSource.addFamily(fam);
- fam = new HColumnDescriptor(noRepfamName);
- t1_syncupSource.addFamily(fam);
-
- t1_syncupTarget = new HTableDescriptor(t1_su);
- fam = new HColumnDescriptor(famName);
- t1_syncupTarget.addFamily(fam);
- fam = new HColumnDescriptor(noRepfamName);
- t1_syncupTarget.addFamily(fam);
-
- t2_syncupSource = new HTableDescriptor(t2_su);
- fam = new HColumnDescriptor(famName);
- fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
- t2_syncupSource.addFamily(fam);
- fam = new HColumnDescriptor(noRepfamName);
- t2_syncupSource.addFamily(fam);
-
- t2_syncupTarget = new HTableDescriptor(t2_su);
- fam = new HColumnDescriptor(famName);
- t2_syncupTarget.addFamily(fam);
- fam = new HColumnDescriptor(noRepfamName);
- t2_syncupTarget.addFamily(fam);
- }
-
- @After
- public void tearDownBase() throws Exception {
- // Do nothing, just replace the super tearDown. because the super tearDown will use the
- // out-of-data HBase admin to remove replication peer, which will be result in failure.
- }
-
/**
- * Add a row to a table in each cluster, check it's replicated, delete it,
- * check's gone Also check the puts and deletes are not replicated back to
- * the originating cluster.
+ * Add a row to a table in each cluster, check it's replicated, delete it, check's gone Also check
+ * the puts and deletes are not replicated back to the originating cluster.
*/
@Test
public void testSyncUpTool() throws Exception {
/**
- * Set up Replication: on Master and one Slave
- * Table: t1_syncup and t2_syncup
- * columnfamily:
- * 'cf1' : replicated
- * 'norep': not replicated
+ * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
+ * 'cf1' : replicated 'norep': not replicated
*/
setupReplication();
/**
- * at Master:
- * t1_syncup: put 100 rows into cf1, and 1 rows into norep
- * t2_syncup: put 200 rows into cf1, and 1 rows into norep
- *
- * verify correctly replicated to slave
+ * at Master: t1_syncup: put 100 rows into cf1, and 1 rows into norep t2_syncup: put 200 rows
+ * into cf1, and 1 rows into norep verify correctly replicated to slave
*/
putAndReplicateRows();
/**
- * Verify delete works
- *
- * step 1: stop hbase on Slave
- *
- * step 2: at Master:
- * t1_syncup: delete 50 rows from cf1
- * t2_syncup: delete 100 rows from cf1
- * no change on 'norep'
- *
- * step 3: stop hbase on master, restart hbase on Slave
- *
- * step 4: verify Slave still have the rows before delete
- * t1_syncup: 100 rows from cf1
- * t2_syncup: 200 rows from cf1
- *
- * step 5: run syncup tool on Master
- *
- * step 6: verify that delete show up on Slave
- * t1_syncup: 50 rows from cf1
- * t2_syncup: 100 rows from cf1
- *
- * verify correctly replicated to Slave
+ * Verify delete works step 1: stop hbase on Slave step 2: at Master: t1_syncup: delete 50 rows
+ * from cf1 t2_syncup: delete 100 rows from cf1 no change on 'norep' step 3: stop hbase on
+ * master, restart hbase on Slave step 4: verify Slave still have the rows before delete
+ * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step 5: run syncup tool on Master
+ * step 6: verify that delete show up on Slave t1_syncup: 50 rows from cf1 t2_syncup: 100 rows
+ * from cf1 verify correctly replicated to Slave
*/
mimicSyncUpAfterDelete();
/**
- * Verify put works
- *
- * step 1: stop hbase on Slave
- *
- * step 2: at Master:
- * t1_syncup: put 100 rows from cf1
- * t2_syncup: put 200 rows from cf1
- * and put another row on 'norep'
- * ATTN: put to 'cf1' will overwrite existing rows, so end count will
- * be 100 and 200 respectively
- * put to 'norep' will add a new row.
- *
- * step 3: stop hbase on master, restart hbase on Slave
- *
- * step 4: verify Slave still has the rows before put
- * t1_syncup: 50 rows from cf1
- * t2_syncup: 100 rows from cf1
- *
- * step 5: run syncup tool on Master
- *
- * step 6: verify that put show up on Slave
- * and 'norep' does not
- * t1_syncup: 100 rows from cf1
- * t2_syncup: 200 rows from cf1
- *
- * verify correctly replicated to Slave
+ * Verify put works step 1: stop hbase on Slave step 2: at Master: t1_syncup: put 100 rows from
+ * cf1 t2_syncup: put 200 rows from cf1 and put another row on 'norep' ATTN: put to 'cf1' will
+ * overwrite existing rows, so end count will be 100 and 200 respectively put to 'norep' will
+ * add a new row. step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
+ * still has the rows before put t1_syncup: 50 rows from cf1 t2_syncup: 100 rows from cf1 step
+ * 5: run syncup tool on Master step 6: verify that put show up on Slave and 'norep' does not
+ * t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 verify correctly replicated to
+ * Slave
*/
mimicSyncUpAfterPut();
}
- protected void setupReplication() throws Exception {
- ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
- ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
-
- Admin ha = utility1.getAdmin();
- ha.createTable(t1_syncupSource);
- ha.createTable(t2_syncupSource);
- ha.close();
-
- ha = utility2.getAdmin();
- ha.createTable(t1_syncupTarget);
- ha.createTable(t2_syncupTarget);
- ha.close();
-
- Connection connection1 = ConnectionFactory.createConnection(utility1.getConfiguration());
- Connection connection2 = ConnectionFactory.createConnection(utility2.getConfiguration());
-
- // Get HTable from Master
- ht1Source = connection1.getTable(t1_su);
- ht2Source = connection1.getTable(t2_su);
-
- // Get HTable from Peer1
- ht1TargetAtPeer1 = connection2.getTable(t1_su);
- ht2TargetAtPeer1 = connection2.getTable(t2_su);
-
- /**
- * set M-S : Master: utility1 Slave1: utility2
- */
- ReplicationPeerConfig rpc = new ReplicationPeerConfig();
- rpc.setClusterKey(utility2.getClusterKey());
- admin1.addPeer("1", rpc, null);
-
- admin1.close();
- admin2.close();
- }
-
private void putAndReplicateRows() throws Exception {
LOG.debug("putAndReplicateRows");
// add rows to Master cluster,
@@ -233,46 +95,46 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
// 100 + 1 row to t1_syncup
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
ht1Source.put(p);
}
p = new Put(Bytes.toBytes("row" + 9999));
- p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
+ p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
ht1Source.put(p);
// 200 + 1 row to t2_syncup
for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
ht2Source.put(p);
}
p = new Put(Bytes.toBytes("row" + 9999));
- p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9999));
+ p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9999));
ht2Source.put(p);
// ensure replication completed
Thread.sleep(SLEEP_TIME);
- int rowCount_ht1Source = utility1.countRows(ht1Source);
+ int rowCountHt1Source = UTIL1.countRows(ht1Source);
for (int i = 0; i < NB_RETRIES; i++) {
- int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
- if (i==NB_RETRIES-1) {
- assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCount_ht1Source - 1,
- rowCount_ht1TargetAtPeer1);
+ int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
+ if (i == NB_RETRIES - 1) {
+ assertEquals("t1_syncup has 101 rows on source, and 100 on slave1", rowCountHt1Source - 1,
+ rowCountHt1TargetAtPeer1);
}
- if (rowCount_ht1Source - 1 == rowCount_ht1TargetAtPeer1) {
+ if (rowCountHt1Source - 1 == rowCountHt1TargetAtPeer1) {
break;
}
Thread.sleep(SLEEP_TIME);
}
- int rowCount_ht2Source = utility1.countRows(ht2Source);
+ int rowCountHt2Source = UTIL1.countRows(ht2Source);
for (int i = 0; i < NB_RETRIES; i++) {
- int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
- if (i==NB_RETRIES-1) {
- assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCount_ht2Source - 1,
- rowCount_ht2TargetAtPeer1);
+ int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
+ if (i == NB_RETRIES - 1) {
+ assertEquals("t2_syncup has 201 rows on source, and 200 on slave1", rowCountHt2Source - 1,
+ rowCountHt2TargetAtPeer1);
}
- if (rowCount_ht2Source - 1 == rowCount_ht2TargetAtPeer1) {
+ if (rowCountHt2Source - 1 == rowCountHt2TargetAtPeer1) {
break;
}
Thread.sleep(SLEEP_TIME);
@@ -281,7 +143,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
private void mimicSyncUpAfterDelete() throws Exception {
LOG.debug("mimicSyncUpAfterDelete");
- utility2.shutdownMiniHBaseCluster();
+ UTIL2.shutdownMiniHBaseCluster();
List<Delete> list = new ArrayList<>();
// delete half of the rows
@@ -299,51 +161,50 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
}
ht2Source.delete(list);
- int rowCount_ht1Source = utility1.countRows(ht1Source);
+ int rowCount_ht1Source = UTIL1.countRows(ht1Source);
assertEquals("t1_syncup has 51 rows on source, after remove 50 of the replicated colfam", 51,
rowCount_ht1Source);
- int rowCount_ht2Source = utility1.countRows(ht2Source);
- assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam",
- 101, rowCount_ht2Source);
+ int rowCount_ht2Source = UTIL1.countRows(ht2Source);
+ assertEquals("t2_syncup has 101 rows on source, after remove 100 of the replicated colfam", 101,
+ rowCount_ht2Source);
- utility1.shutdownMiniHBaseCluster();
- utility2.restartHBaseCluster(1);
+ UTIL1.shutdownMiniHBaseCluster();
+ UTIL2.restartHBaseCluster(1);
Thread.sleep(SLEEP_TIME);
// before sync up
- int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
- int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
- assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
- assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
+ int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
+ int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
+ assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCountHt1TargetAtPeer1);
+ assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCountHt2TargetAtPeer1);
// After sync up
for (int i = 0; i < NB_RETRIES; i++) {
- syncUp(utility1);
- rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
- rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ syncUp(UTIL1);
+ rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
+ rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) {
- if (rowCount_ht1TargetAtPeer1 != 50 || rowCount_ht2TargetAtPeer1 != 100) {
+ if (rowCountHt1TargetAtPeer1 != 50 || rowCountHt2TargetAtPeer1 != 100) {
// syncUP still failed. Let's look at the source in case anything wrong there
- utility1.restartHBaseCluster(1);
- rowCount_ht1Source = utility1.countRows(ht1Source);
+ UTIL1.restartHBaseCluster(1);
+ rowCount_ht1Source = UTIL1.countRows(ht1Source);
LOG.debug("t1_syncup should have 51 rows at source, and it is " + rowCount_ht1Source);
- rowCount_ht2Source = utility1.countRows(ht2Source);
+ rowCount_ht2Source = UTIL1.countRows(ht2Source);
LOG.debug("t2_syncup should have 101 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 50 rows", 50,
- rowCount_ht1TargetAtPeer1);
+ rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 100 rows", 100,
- rowCount_ht2TargetAtPeer1);
+ rowCountHt2TargetAtPeer1);
}
- if (rowCount_ht1TargetAtPeer1 == 50 && rowCount_ht2TargetAtPeer1 == 100) {
+ if (rowCountHt1TargetAtPeer1 == 50 && rowCountHt2TargetAtPeer1 == 100) {
LOG.info("SyncUpAfterDelete succeeded at retry = " + i);
break;
} else {
- LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
- + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
- + rowCount_ht2TargetAtPeer1);
+ LOG.debug("SyncUpAfterDelete failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" +
+ rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
@@ -351,82 +212,77 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
private void mimicSyncUpAfterPut() throws Exception {
LOG.debug("mimicSyncUpAfterPut");
- utility1.restartHBaseCluster(1);
- utility2.shutdownMiniHBaseCluster();
+ UTIL1.restartHBaseCluster(1);
+ UTIL2.shutdownMiniHBaseCluster();
Put p;
// another 100 + 1 row to t1_syncup
// we should see 100 + 2 rows now
for (int i = 0; i < NB_ROWS_IN_BATCH; i++) {
p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
ht1Source.put(p);
}
p = new Put(Bytes.toBytes("row" + 9998));
- p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
+ p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
ht1Source.put(p);
// another 200 + 1 row to t1_syncup
// we should see 200 + 2 rows now
for (int i = 0; i < NB_ROWS_IN_BATCH * 2; i++) {
p = new Put(Bytes.toBytes("row" + i));
- p.addColumn(famName, qualName, Bytes.toBytes("val" + i));
+ p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes("val" + i));
ht2Source.put(p);
}
p = new Put(Bytes.toBytes("row" + 9998));
- p.addColumn(noRepfamName, qualName, Bytes.toBytes("val" + 9998));
+ p.addColumn(NO_REP_FAMILY, QUALIFIER, Bytes.toBytes("val" + 9998));
ht2Source.put(p);
- int rowCount_ht1Source = utility1.countRows(ht1Source);
+ int rowCount_ht1Source = UTIL1.countRows(ht1Source);
assertEquals("t1_syncup has 102 rows on source", 102, rowCount_ht1Source);
- int rowCount_ht2Source = utility1.countRows(ht2Source);
+ int rowCount_ht2Source = UTIL1.countRows(ht2Source);
assertEquals("t2_syncup has 202 rows on source", 202, rowCount_ht2Source);
- utility1.shutdownMiniHBaseCluster();
- utility2.restartHBaseCluster(1);
+ UTIL1.shutdownMiniHBaseCluster();
+ UTIL2.restartHBaseCluster(1);
Thread.sleep(SLEEP_TIME);
// before sync up
- int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
- int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ int rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
+ int rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
assertEquals("@Peer1 t1_syncup should be NOT sync up and have 50 rows", 50,
- rowCount_ht1TargetAtPeer1);
+ rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be NOT sync up and have 100 rows", 100,
- rowCount_ht2TargetAtPeer1);
+ rowCountHt2TargetAtPeer1);
// after syun up
for (int i = 0; i < NB_RETRIES; i++) {
- syncUp(utility1);
- rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
- rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+ syncUp(UTIL1);
+ rowCountHt1TargetAtPeer1 = UTIL2.countRows(ht1TargetAtPeer1);
+ rowCountHt2TargetAtPeer1 = UTIL2.countRows(ht2TargetAtPeer1);
if (i == NB_RETRIES - 1) {
- if (rowCount_ht1TargetAtPeer1 != 100 || rowCount_ht2TargetAtPeer1 != 200) {
+ if (rowCountHt1TargetAtPeer1 != 100 || rowCountHt2TargetAtPeer1 != 200) {
// syncUP still failed. Let's look at the source in case anything wrong there
- utility1.restartHBaseCluster(1);
- rowCount_ht1Source = utility1.countRows(ht1Source);
+ UTIL1.restartHBaseCluster(1);
+ rowCount_ht1Source = UTIL1.countRows(ht1Source);
LOG.debug("t1_syncup should have 102 rows at source, and it is " + rowCount_ht1Source);
- rowCount_ht2Source = utility1.countRows(ht2Source);
+ rowCount_ht2Source = UTIL1.countRows(ht2Source);
LOG.debug("t2_syncup should have 202 rows at source, and it is " + rowCount_ht2Source);
}
assertEquals("@Peer1 t1_syncup should be sync up and have 100 rows", 100,
- rowCount_ht1TargetAtPeer1);
+ rowCountHt1TargetAtPeer1);
assertEquals("@Peer1 t2_syncup should be sync up and have 200 rows", 200,
- rowCount_ht2TargetAtPeer1);
+ rowCountHt2TargetAtPeer1);
}
- if (rowCount_ht1TargetAtPeer1 == 100 && rowCount_ht2TargetAtPeer1 == 200) {
+ if (rowCountHt1TargetAtPeer1 == 100 && rowCountHt2TargetAtPeer1 == 200) {
LOG.info("SyncUpAfterPut succeeded at retry = " + i);
break;
} else {
- LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
- + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
- + rowCount_ht2TargetAtPeer1);
+ LOG.debug("SyncUpAfterPut failed at retry = " + i + ", with rowCount_ht1TargetPeer1 =" +
+ rowCountHt1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 =" + rowCountHt2TargetAtPeer1);
}
Thread.sleep(SLEEP_TIME);
}
}
-
- protected void syncUp(HBaseTestingUtility ut) throws Exception {
- ToolRunner.run(ut.getConfiguration(), new ReplicationSyncUp(), new String[0]);
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
new file mode 100644
index 0000000..bf3941d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolBase.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_GLOBAL;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.After;
+import org.junit.Before;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+public abstract class TestReplicationSyncUpToolBase {
+
+ protected static final HBaseTestingUtility UTIL1 = new HBaseTestingUtility();
+ protected static final HBaseTestingUtility UTIL2 = new HBaseTestingUtility();
+
+ protected static final TableName TN1 = TableName.valueOf("t1_syncup");
+ protected static final TableName TN2 = TableName.valueOf("t2_syncup");
+
+ protected static final byte[] FAMILY = Bytes.toBytes("cf1");
+ protected static final byte[] QUALIFIER = Bytes.toBytes("q1");
+
+ protected static final byte[] NO_REP_FAMILY = Bytes.toBytes("norep");
+
+ protected TableDescriptor t1SyncupSource;
+ protected TableDescriptor t1SyncupTarget;
+ protected TableDescriptor t2SyncupSource;
+ protected TableDescriptor t2SyncupTarget;
+
+ protected Connection conn1;
+ protected Connection conn2;
+
+ protected Table ht1Source;
+ protected Table ht2Source;
+ protected Table ht1TargetAtPeer1;
+ protected Table ht2TargetAtPeer1;
+
+ protected void customizeClusterConf(Configuration conf) {
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ customizeClusterConf(UTIL1.getConfiguration());
+ customizeClusterConf(UTIL2.getConfiguration());
+ TestReplicationBase.configureClusters(UTIL1, UTIL2);
+ UTIL1.startMiniZKCluster();
+ UTIL2.setZkCluster(UTIL1.getZkCluster());
+
+ UTIL1.startMiniCluster(2);
+ UTIL2.startMiniCluster(4);
+
+ t1SyncupSource = TableDescriptorBuilder.newBuilder(TN1)
+ .setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
+
+ t1SyncupTarget = TableDescriptorBuilder.newBuilder(TN1)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
+
+ t2SyncupSource = TableDescriptorBuilder.newBuilder(TN2)
+ .setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setScope(REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
+
+ t2SyncupTarget = TableDescriptorBuilder.newBuilder(TN2)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(NO_REP_FAMILY)).build();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ Closeables.close(ht1Source, true);
+ Closeables.close(ht2Source, true);
+ Closeables.close(ht1TargetAtPeer1, true);
+ Closeables.close(ht2TargetAtPeer1, true);
+ Closeables.close(conn1, true);
+ Closeables.close(conn2, true);
+ UTIL2.shutdownMiniCluster();
+ UTIL1.shutdownMiniCluster();
+ }
+
+ protected final void setupReplication() throws Exception {
+ Admin admin1 = UTIL1.getAdmin();
+ admin1.createTable(t1SyncupSource);
+ admin1.createTable(t2SyncupSource);
+
+ Admin admin2 = UTIL2.getAdmin();
+ admin2.createTable(t1SyncupTarget);
+ admin2.createTable(t2SyncupTarget);
+
+ // Get HTable from Master
+ Connection conn1 = ConnectionFactory.createConnection(UTIL1.getConfiguration());
+ ht1Source = conn1.getTable(TN1);
+ ht2Source = conn1.getTable(TN2);
+
+ // Get HTable from Peer1
+ Connection conn2 = ConnectionFactory.createConnection(UTIL2.getConfiguration());
+ ht1TargetAtPeer1 = conn2.getTable(TN1);
+ ht2TargetAtPeer1 = conn2.getTable(TN2);
+
+ /**
+ * set M-S : Master: utility1 Slave1: utility2
+ */
+ ReplicationPeerConfig rpc =
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL2.getClusterKey()).build();
+ admin1.addReplicationPeer("1", rpc);
+ }
+
+ protected final void syncUp(HBaseTestingUtility util) throws Exception {
+ ToolRunner.run(util.getConfiguration(), new ReplicationSyncUp(), new String[0]);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java
index 594aac0..2aa3ea4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleAsyncWAL.java
@@ -36,8 +36,8 @@ public class TestReplicationEndpointWithMultipleAsyncWAL extends TestReplication
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
- conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
+ CONF1.set(WALFactory.WAL_PROVIDER, "multiwal");
+ CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
TestReplicationEndpoint.setUpBeforeClass();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
index 68b41be..36c07fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationEndpointWithMultipleWAL.java
@@ -36,8 +36,8 @@ public class TestReplicationEndpointWithMultipleWAL extends TestReplicationEndpo
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
- conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
+ CONF1.set(WALFactory.WAL_PROVIDER, "multiwal");
+ CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
TestReplicationEndpoint.setUpBeforeClass();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java
index 4685f24..0f79492 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL.java
@@ -37,8 +37,8 @@ public class TestReplicationKillMasterRSCompressedWithMultipleAsyncWAL extends
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
- conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
+ CONF1.set(WALFactory.WAL_PROVIDER, "multiwal");
+ CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
TestReplicationKillMasterRSCompressed.setUpBeforeClass();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
index 82fef3aa5..21f325c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationKillMasterRSCompressedWithMultipleWAL.java
@@ -37,8 +37,8 @@ public class TestReplicationKillMasterRSCompressedWithMultipleWAL extends
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
- conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
+ CONF1.set(WALFactory.WAL_PROVIDER, "multiwal");
+ CONF1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
TestReplicationKillMasterRSCompressed.setUpBeforeClass();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
index 1451499..b2835ee 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleAsyncWAL.java
@@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hbase.replication.multiwal;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@@ -35,10 +34,9 @@ public class TestReplicationSyncUpToolWithMultipleAsyncWAL extends TestReplicati
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleAsyncWAL.class);
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
- conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
- TestReplicationBase.setUpBeforeClass();
+ @Override
+ protected void customizeClusterConf(Configuration conf) {
+ conf.set(WALFactory.WAL_PROVIDER, "multiwal");
+ conf.set(RegionGroupingProvider.DELEGATE_PROVIDER, "asyncfs");
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
index e487039..a5dbaf3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/multiwal/TestReplicationSyncUpToolWithMultipleWAL.java
@@ -17,14 +17,13 @@
*/
package org.apache.hadoop.hbase.replication.multiwal;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.replication.TestReplicationSyncUpTool;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.wal.RegionGroupingProvider;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
@@ -33,12 +32,11 @@ public class TestReplicationSyncUpToolWithMultipleWAL extends TestReplicationSyn
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleWAL.class);
+ HBaseClassTestRule.forClass(TestReplicationSyncUpToolWithMultipleWAL.class);
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- conf1.set(WALFactory.WAL_PROVIDER, "multiwal");
- conf1.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
- TestReplicationBase.setUpBeforeClass();
+ @Override
+ protected void customizeClusterConf(Configuration conf) {
+ conf.set(WALFactory.WAL_PROVIDER, "multiwal");
+ conf.set(RegionGroupingProvider.DELEGATE_PROVIDER, "filesystem");
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index 24329a0..bff363f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -59,19 +59,19 @@ public class TestReplicator extends TestReplicationBase {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
// Set RPC size limit to 10kb (will be applied to both source and sink clusters)
- conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
+ CONF1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
TestReplicationBase.setUpBeforeClass();
}
@Test
public void testReplicatorBatching() throws Exception {
// Clear the tables
- truncateTable(utility1, tableName);
- truncateTable(utility2, tableName);
+ truncateTable(UTIL1, tableName);
+ truncateTable(UTIL2, tableName);
// Replace the peer set up for us by the base class with a wrapper for this test
admin.addPeer("testReplicatorBatching",
- new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+ new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey())
.setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()),
null);
@@ -92,7 +92,7 @@ public class TestReplicator extends TestReplicationBase {
}
// Wait for replication to complete.
- Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+ Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
LOG.info("Count=" + ReplicationEndpointForTest.getBatchCount());
@@ -107,7 +107,7 @@ public class TestReplicator extends TestReplicationBase {
assertEquals("We sent an incorrect number of batches", NUM_ROWS,
ReplicationEndpointForTest.getBatchCount());
- assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2));
+ assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
} finally {
admin.removePeer("testReplicatorBatching");
}
@@ -116,12 +116,12 @@ public class TestReplicator extends TestReplicationBase {
@Test
public void testReplicatorWithErrors() throws Exception {
// Clear the tables
- truncateTable(utility1, tableName);
- truncateTable(utility2, tableName);
+ truncateTable(UTIL1, tableName);
+ truncateTable(UTIL2, tableName);
// Replace the peer set up for us by the base class with a wrapper for this test
admin.addPeer("testReplicatorWithErrors",
- new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+ new ReplicationPeerConfig().setClusterKey(UTIL2.getClusterKey())
.setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
null);
@@ -143,7 +143,7 @@ public class TestReplicator extends TestReplicationBase {
// Wait for replication to complete.
// We can expect 10 batches
- Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+ Waiter.waitFor(CONF1, 60000, new Waiter.ExplainingPredicate<Exception>() {
@Override
public boolean evaluate() throws Exception {
return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
@@ -155,7 +155,7 @@ public class TestReplicator extends TestReplicationBase {
}
});
- assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2));
+ assertEquals("We did not replicate enough rows", NUM_ROWS, UTIL2.countRows(htable2));
} finally {
admin.removePeer("testReplicatorWithErrors");
}