You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/01/03 05:17:13 UTC
[hbase] branch branch-2 updated: HBASE-23098 [bulkload] If one of
the peers in a cluster is configured with NAMESPACE level,
its hfile-refs(zk) will be backlogged (#676)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 381ff85 HBASE-23098 [bulkload] If one of the peers in a cluster is configured with NAMESPACE level, its hfile-refs(zk) will be backlogged (#676)
381ff85 is described below
commit 381ff85e3d4a824b2d3f2532dab5136b65820f21
Author: Yiran Wu <yi...@gmail.com>
AuthorDate: Fri Jan 3 13:10:42 2020 +0800
HBASE-23098 [bulkload] If one of the peers in a cluster is configured with NAMESPACE level, its hfile-refs(zk) will be backlogged (#676)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
Signed-off-by: stack <st...@apache.org>
---
.../regionserver/ReplicationSource.java | 15 +-
.../regionserver/TestBulkLoadReplication.java | 32 ++-
...TestNamespaceReplicationWithBulkLoadedData.java | 290 +++++++++++++++++++++
.../hbase/replication/TestReplicationBase.java | 5 +
4 files changed, 329 insertions(+), 13 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ee423e0..155f08c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -28,6 +28,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -224,8 +225,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
@Override
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
+ String peerId = replicationPeer.getId();
+ Set<String> namespaces = replicationPeer.getNamespaces();
Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
- if (tableCFMap != null) {
+ if (tableCFMap != null) { // All peers with TableCFs
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
@@ -233,7 +236,15 @@ public class ReplicationSource implements ReplicationSourceInterface {
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
- tableName, Bytes.toString(family), peerId);
+ tableName, Bytes.toString(family), peerId);
+ }
+ } else if (namespaces != null) { // Only for set NAMESPACES peers
+ if (namespaces.contains(tableName.getNamespaceAsString())) {
+ this.queueStorage.addHFileRefs(peerId, pairs);
+ metrics.incrSizeOfHFileRefsQueue(pairs.size());
+ } else {
+ LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+ tableName, Bytes.toString(family), peerId);
}
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 6fd7288..063c70b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -122,8 +123,8 @@ public class TestBulkLoadReplication extends TestReplicationBase {
private static AtomicInteger BULK_LOADS_COUNT;
private static CountDownLatch BULK_LOAD_LATCH;
- private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
- private static final Configuration CONF3 = UTIL3.getConfiguration();
+ protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
+ protected static final Configuration CONF3 = UTIL3.getConfiguration();
private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
@@ -220,7 +221,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
}
- private static void setupBulkLoadConfigsForCluster(Configuration config,
+ protected static void setupBulkLoadConfigsForCluster(Configuration config,
String clusterReplicationId) throws Exception {
config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
@@ -238,13 +239,16 @@ public class TestBulkLoadReplication extends TestReplicationBase {
Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
byte[] row = Bytes.toBytes("001");
byte[] value = Bytes.toBytes("v1");
- assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable);
+ assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
+ peer2TestTable, peer3TestTable);
row = Bytes.toBytes("002");
value = Bytes.toBytes("v2");
- assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable);
+ assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable,
+ peer2TestTable, peer3TestTable);
row = Bytes.toBytes("003");
value = Bytes.toBytes("v3");
- assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable);
+ assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable,
+ peer2TestTable, peer3TestTable);
//Additional wait to make sure no extra bulk load happens
Thread.sleep(400);
//We have 3 bulk load events (1 initiated on each cluster).
@@ -278,18 +282,18 @@ public class TestBulkLoadReplication extends TestReplicationBase {
}
- private void assertBulkLoadConditions(byte[] row, byte[] value,
+ protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
HBaseTestingUtility utility, Table...tables) throws Exception {
BULK_LOAD_LATCH = new CountDownLatch(3);
- bulkLoadOnCluster(row, value, utility);
+ bulkLoadOnCluster(tableName, row, value, utility);
assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
assertTableHasValue(tables[0], row, value);
assertTableHasValue(tables[1], row, value);
assertTableHasValue(tables[2], row, value);
}
- private void bulkLoadOnCluster(byte[] row, byte[] value,
- HBaseTestingUtility cluster) throws Exception {
+ protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
+ HBaseTestingUtility cluster) throws Exception {
String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
@@ -302,13 +306,19 @@ public class TestBulkLoadReplication extends TestReplicationBase {
cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
}
- private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
+ protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
Get get = new Get(row);
Result result = table.get(get);
assertTrue(result.advance());
assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
}
+ protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
+ Get get = new Get(row);
+ Result result = table.get(get);
+ assertTrue(result.isEmpty());
+ }
+
private String createHFileForFamilies(byte[] row, byte[] value,
Configuration clusterConfig) throws IOException {
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java
new file mode 100644
index 0000000..48790b3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testcase for HBASE-23098
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestNamespaceReplicationWithBulkLoadedData.class);
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class);
+
+ private static final HBaseTestingUtility UTIL4 = new HBaseTestingUtility();
+ private static final String PEER4_CLUSTER_ID = "peer4";
+ private static final String PEER4_NS = "ns_peer1";
+ private static final String PEER4_NS_TABLE = "ns_peer2";
+
+ private static final Configuration CONF4 = UTIL4.getConfiguration();
+
+ private static final String NS1 = "ns1";
+ private static final String NS2 = "ns2";
+
+ private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup");
+ private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID);
+ setupConfig(UTIL4, "/4");
+ TestBulkLoadReplication.setUpBeforeClass();
+ startFourthCluster();
+ }
+
+ private static void startFourthCluster() throws Exception {
+ LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3");
+ UTIL4.setZkCluster(UTIL1.getZkCluster());
+ UTIL4.startMiniCluster(NUM_SLAVES1);
+
+ TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+ Connection connection4 = ConnectionFactory.createConnection(CONF4);
+ try (Admin admin4 = connection4.getAdmin()) {
+ admin4.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+ }
+ UTIL4.waitUntilAllRegionsAssigned(tableName);
+ }
+
+ @Before
+ @Override
+ public void setUpBase() throws Exception {
+ /** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3
+ * and this test add the fourth cluster.
+ * So we have following topology:
+ * 1
+ * / \
+ * 2 4
+ * /
+ * 3
+ *
+ * The 1 -> 4 has two peers,
+ * ns_peer1: ns1 -> ns1 (validate this peer hfile-refs)
+ * ns_peer1 configuration is NAMESPACES => ["ns1"]
+ *
+ * ns_peer2: ns2:t2_syncup -> ns2:t2_syncup, this peers is
+ * ns_peer2 configuration is NAMESPACES => ["ns2"],
+ * TABLE_CFS => { "ns2:t2_syncup" => []}
+ *
+ * The 1 -> 2 has one peer, this peer configuration is
+ * add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
+ *
+ */
+ super.setUpBase();
+
+ // Create tables
+ TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
+ .setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(famName)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+ TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
+ .setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(famName)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+ Admin admin1 = UTIL1.getAdmin();
+ admin1.createNamespace(NamespaceDescriptor.create(NS1).build());
+ admin1.createNamespace(NamespaceDescriptor.create(NS2).build());
+ admin1.createTable(table1);
+ admin1.createTable(table2);
+
+ Admin admin2 = UTIL2.getAdmin();
+ admin2.createNamespace(NamespaceDescriptor.create(NS1).build());
+ admin2.createNamespace(NamespaceDescriptor.create(NS2).build());
+ admin2.createTable(table1);
+ admin2.createTable(table2);
+
+ Admin admin3 = UTIL3.getAdmin();
+ admin3.createNamespace(NamespaceDescriptor.create(NS1).build());
+ admin3.createNamespace(NamespaceDescriptor.create(NS2).build());
+ admin3.createTable(table1);
+ admin3.createTable(table2);
+
+ Admin admin4 = UTIL4.getAdmin();
+ admin4.createNamespace(NamespaceDescriptor.create(NS1).build());
+ admin4.createNamespace(NamespaceDescriptor.create(NS2).build());
+ admin4.createTable(table1);
+ admin4.createTable(table2);
+
+ /**
+ * Set ns_peer1 1: ns1 -> 2: ns1
+ *
+ * add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
+ * NAMESPACES => ["ns1"]
+ */
+ Set<String> namespaces = new HashSet<>();
+ namespaces.add(NS1);
+ ReplicationPeerConfig rpc4_ns =
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
+ .setReplicateAllUserTables(false).setNamespaces(namespaces).build();
+ admin1.addReplicationPeer(PEER4_NS, rpc4_ns);
+
+ /**
+ * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup
+ *
+ * add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
+ * NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
+ */
+ Map<TableName, List<String>> tableCFsMap = new HashMap<>();
+ tableCFsMap.put(NS2_TABLE, null);
+ ReplicationPeerConfig rpc4_ns_table =
+ ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
+ .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build();
+ admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table);
+ }
+
+ @After
+ @Override
+ public void tearDownBase() throws Exception {
+ super.tearDownBase();
+ TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
+ .setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(famName)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+ TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
+ .setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(famName)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+ Admin admin1 = UTIL1.getAdmin();
+ admin1.disableTable(table1.getTableName());
+ admin1.deleteTable(table1.getTableName());
+ admin1.disableTable(table2.getTableName());
+ admin1.deleteTable(table2.getTableName());
+ admin1.deleteNamespace(NS1);
+ admin1.deleteNamespace(NS2);
+
+ Admin admin2 = UTIL2.getAdmin();
+ admin2.disableTable(table1.getTableName());
+ admin2.deleteTable(table1.getTableName());
+ admin2.disableTable(table2.getTableName());
+ admin2.deleteTable(table2.getTableName());
+ admin2.deleteNamespace(NS1);
+ admin2.deleteNamespace(NS2);
+
+ Admin admin3 = UTIL3.getAdmin();
+ admin3.disableTable(table1.getTableName());
+ admin3.deleteTable(table1.getTableName());
+ admin3.disableTable(table2.getTableName());
+ admin3.deleteTable(table2.getTableName());
+ admin3.deleteNamespace(NS1);
+ admin3.deleteNamespace(NS2);
+
+ Admin admin4 = UTIL4.getAdmin();
+ admin4.disableTable(table1.getTableName());
+ admin4.deleteTable(table1.getTableName());
+ admin4.disableTable(table2.getTableName());
+ admin4.deleteTable(table2.getTableName());
+ admin4.deleteNamespace(NS1);
+ admin4.deleteNamespace(NS2);
+ UTIL1.getAdmin().removeReplicationPeer(PEER4_NS);
+ UTIL1.getAdmin().removeReplicationPeer(PEER4_NS_TABLE);
+ }
+
+ @Test
+ @Override
+ public void testBulkLoadReplicationActiveActive() throws Exception {
+ Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
+ Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
+ Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
+ Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName);
+ Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE);
+ Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE);
+
+ // case1: The ns1 tables will be replicate to cluster4
+ byte[] row = Bytes.toBytes("002_ns_peer");
+ byte[] value = Bytes.toBytes("v2");
+ bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1);
+ waitForReplication(ns1Table, 1, NB_RETRIES);
+ assertTableHasValue(ns1Table, row, value);
+
+ // case2: The ns2:t2_syncup will be replicate to cluster4
+ // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog
+ row = Bytes.toBytes("003_ns_table_peer");
+ value = Bytes.toBytes("v2");
+ bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1);
+ waitForReplication(ns2Table, 1, NB_RETRIES);
+ assertTableHasValue(ns2Table, row, value);
+
+ // case3: The table test will be replicate to cluster1,cluster2,cluster3
+ // not replicate to cluster4, because we not set other peer for that tables.
+ row = Bytes.toBytes("001_nopeer");
+ value = Bytes.toBytes("v1");
+ assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
+ peer2TestTable, peer3TestTable);
+ assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty
+
+ // Verify hfile-refs for 1:ns_peer1, expect is empty
+ MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster();
+ ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null);
+ RecoverableZooKeeper zk = ZKUtil.connect(UTIL1.getConfiguration(), watcher);
+ ZKReplicationQueueStorage replicationQueueStorage =
+ new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration());
+ Set<String> hfiles = replicationQueueStorage.getAllHFileRefs();
+ assertTrue(hfiles.isEmpty());
+ }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index df44486..2e87282 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -132,6 +132,11 @@ public class TestReplicationBase {
protected static void waitForReplication(int expectedRows, int retries)
throws IOException, InterruptedException {
+ waitForReplication(htable2, expectedRows, retries);
+ }
+
+ protected static void waitForReplication(Table htable2, int expectedRows, int retries)
+ throws IOException, InterruptedException {
Scan scan;
for (int i = 0; i < retries; i++) {
scan = new Scan();