You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2020/01/03 05:17:13 UTC

[hbase] branch branch-2 updated: HBASE-23098 [bulkload] If one of the peers in a cluster is configured with NAMESPACE level, its hfile-refs(zk) will be backlogged (#676)

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 381ff85  HBASE-23098 [bulkload] If one of the peers in a cluster is configured with NAMESPACE level, its hfile-refs(zk) will be backlogged (#676)
381ff85 is described below

commit 381ff85e3d4a824b2d3f2532dab5136b65820f21
Author: Yiran Wu <yi...@gmail.com>
AuthorDate: Fri Jan 3 13:10:42 2020 +0800

    HBASE-23098 [bulkload] If one of the peers in a cluster is configured with NAMESPACE level, its hfile-refs(zk) will be backlogged (#676)
    
     Signed-off-by: Wellington Chevreuil <wc...@apache.org>
     Signed-off-by: stack <st...@apache.org>
---
 .../regionserver/ReplicationSource.java            |  15 +-
 .../regionserver/TestBulkLoadReplication.java      |  32 ++-
 ...TestNamespaceReplicationWithBulkLoadedData.java | 290 +++++++++++++++++++++
 .../hbase/replication/TestReplicationBase.java     |   5 +
 4 files changed, 329 insertions(+), 13 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index ee423e0..155f08c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -28,6 +28,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -224,8 +225,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
   @Override
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
       throws ReplicationException {
+    String peerId = replicationPeer.getId();
+    Set<String> namespaces = replicationPeer.getNamespaces();
     Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
-    if (tableCFMap != null) {
+    if (tableCFMap != null) { // All peers with TableCFs
       List<String> tableCfs = tableCFMap.get(tableName);
       if (tableCFMap.containsKey(tableName)
           && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
@@ -233,7 +236,15 @@ public class ReplicationSource implements ReplicationSourceInterface {
         metrics.incrSizeOfHFileRefsQueue(pairs.size());
       } else {
         LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
-          tableName, Bytes.toString(family), peerId);
+            tableName, Bytes.toString(family), peerId);
+      }
+    } else if (namespaces != null) { // Only for set NAMESPACES peers
+      if (namespaces.contains(tableName.getNamespaceAsString())) {
+        this.queueStorage.addHFileRefs(peerId, pairs);
+        metrics.incrSizeOfHFileRefsQueue(pairs.size());
+      } else {
+        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+            tableName, Bytes.toString(family), peerId);
       }
     } else {
       // user has explicitly not defined any table cfs for replication, means replicate all the
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 6fd7288..063c70b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
@@ -122,8 +123,8 @@ public class TestBulkLoadReplication extends TestReplicationBase {
   private static AtomicInteger BULK_LOADS_COUNT;
   private static CountDownLatch BULK_LOAD_LATCH;
 
-  private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
-  private static final Configuration CONF3 = UTIL3.getConfiguration();
+  protected static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
+  protected static final Configuration CONF3 = UTIL3.getConfiguration();
 
   private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
 
@@ -220,7 +221,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
   }
 
-  private static void setupBulkLoadConfigsForCluster(Configuration config,
+  protected static void setupBulkLoadConfigsForCluster(Configuration config,
     String clusterReplicationId) throws Exception {
     config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
@@ -238,13 +239,16 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
     byte[] row = Bytes.toBytes("001");
     byte[] value = Bytes.toBytes("v1");
-    assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable);
+    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
+        peer2TestTable, peer3TestTable);
     row = Bytes.toBytes("002");
     value = Bytes.toBytes("v2");
-    assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable);
+    assertBulkLoadConditions(tableName, row, value, UTIL2, peer1TestTable,
+        peer2TestTable, peer3TestTable);
     row = Bytes.toBytes("003");
     value = Bytes.toBytes("v3");
-    assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable);
+    assertBulkLoadConditions(tableName, row, value, UTIL3, peer1TestTable,
+        peer2TestTable, peer3TestTable);
     //Additional wait to make sure no extra bulk load happens
     Thread.sleep(400);
     //We have 3 bulk load events (1 initiated on each cluster).
@@ -278,18 +282,18 @@ public class TestBulkLoadReplication extends TestReplicationBase {
   }
 
 
-  private void assertBulkLoadConditions(byte[] row, byte[] value,
+  protected void assertBulkLoadConditions(TableName tableName, byte[] row, byte[] value,
       HBaseTestingUtility utility, Table...tables) throws Exception {
     BULK_LOAD_LATCH = new CountDownLatch(3);
-    bulkLoadOnCluster(row, value, utility);
+    bulkLoadOnCluster(tableName, row, value, utility);
     assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
     assertTableHasValue(tables[0], row, value);
     assertTableHasValue(tables[1], row, value);
     assertTableHasValue(tables[2], row, value);
   }
 
-  private void bulkLoadOnCluster(byte[] row, byte[] value,
-      HBaseTestingUtility cluster) throws Exception {
+  protected void bulkLoadOnCluster(TableName tableName, byte[] row, byte[] value,
+                                 HBaseTestingUtility cluster) throws Exception {
     String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
     copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
     BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
@@ -302,13 +306,19 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
   }
 
-  private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
+  protected void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
     Get get = new Get(row);
     Result result = table.get(get);
     assertTrue(result.advance());
     assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
   }
 
+  protected void assertTableNoValue(Table table, byte[] row, byte[] value) throws Exception {
+    Get get = new Get(row);
+    Result result = table.get(get);
+    assertTrue(result.isEmpty());
+  }
+
   private String createHFileForFamilies(byte[] row, byte[] value,
       Configuration clusterConfig) throws IOException {
     CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java
new file mode 100644
index 0000000..48790b3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplicationWithBulkLoadedData.java
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertTrue;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.regionserver.TestBulkLoadReplication;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Testcase for HBASE-23098
+ */
+@Category({ ReplicationTests.class, MediumTests.class })
+public final class TestNamespaceReplicationWithBulkLoadedData extends TestBulkLoadReplication {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestNamespaceReplicationWithBulkLoadedData.class);
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TestNamespaceReplicationWithBulkLoadedData.class);
+
+  private static final HBaseTestingUtility UTIL4 = new HBaseTestingUtility();
+  private static final String PEER4_CLUSTER_ID = "peer4";
+  private static final String PEER4_NS = "ns_peer1";
+  private static final String PEER4_NS_TABLE = "ns_peer2";
+
+  private static final Configuration CONF4 = UTIL4.getConfiguration();
+
+  private static final String NS1 = "ns1";
+  private static final String NS2 = "ns2";
+
+  private static final TableName NS1_TABLE = TableName.valueOf(NS1 + ":t1_syncup");
+  private static final TableName NS2_TABLE = TableName.valueOf(NS2 + ":t2_syncup");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setupBulkLoadConfigsForCluster(CONF4, PEER4_CLUSTER_ID);
+    setupConfig(UTIL4, "/4");
+    TestBulkLoadReplication.setUpBeforeClass();
+    startFourthCluster();
+  }
+
+  private static void startFourthCluster() throws Exception {
+    LOG.info("Setup Zk to same one from UTIL1 and UTIL2 and UTIL3");
+    UTIL4.setZkCluster(UTIL1.getZkCluster());
+    UTIL4.startMiniCluster(NUM_SLAVES1);
+
+    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+            .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+    Connection connection4 = ConnectionFactory.createConnection(CONF4);
+    try (Admin admin4 = connection4.getAdmin()) {
+      admin4.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+    }
+    UTIL4.waitUntilAllRegionsAssigned(tableName);
+  }
+
+  @Before
+  @Override
+  public void setUpBase() throws Exception {
+    /** "super.setUpBase()" already sets peer1 from 1 <-> 2 <-> 3
+     * and this test add the fourth cluster.
+     * So we have following topology:
+     *      1
+     *     / \
+     *    2   4
+     *   /
+     *  3
+     *
+     *  The 1 -> 4 has two peers,
+     *  ns_peer1:  ns1 -> ns1 (validate this peer hfile-refs)
+     *             ns_peer1 configuration is NAMESPACES => ["ns1"]
+     *
+     *  ns_peer2:  ns2:t2_syncup -> ns2:t2_syncup, this peers is
+     *             ns_peer2 configuration is NAMESPACES => ["ns2"],
+     *                       TABLE_CFS => { "ns2:t2_syncup" => []}
+     *
+     *  The 1 -> 2 has one peer, this peer configuration is
+     *             add_peer '2', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
+     *
+     */
+    super.setUpBase();
+
+    // Create tables
+    TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
+        .setColumnFamily(
+            ColumnFamilyDescriptorBuilder.newBuilder(famName)
+                .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+    TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
+        .setColumnFamily(
+            ColumnFamilyDescriptorBuilder.newBuilder(famName)
+                .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+    Admin admin1 = UTIL1.getAdmin();
+    admin1.createNamespace(NamespaceDescriptor.create(NS1).build());
+    admin1.createNamespace(NamespaceDescriptor.create(NS2).build());
+    admin1.createTable(table1);
+    admin1.createTable(table2);
+
+    Admin admin2 = UTIL2.getAdmin();
+    admin2.createNamespace(NamespaceDescriptor.create(NS1).build());
+    admin2.createNamespace(NamespaceDescriptor.create(NS2).build());
+    admin2.createTable(table1);
+    admin2.createTable(table2);
+
+    Admin admin3 = UTIL3.getAdmin();
+    admin3.createNamespace(NamespaceDescriptor.create(NS1).build());
+    admin3.createNamespace(NamespaceDescriptor.create(NS2).build());
+    admin3.createTable(table1);
+    admin3.createTable(table2);
+
+    Admin admin4 = UTIL4.getAdmin();
+    admin4.createNamespace(NamespaceDescriptor.create(NS1).build());
+    admin4.createNamespace(NamespaceDescriptor.create(NS2).build());
+    admin4.createTable(table1);
+    admin4.createTable(table2);
+
+    /**
+     *  Set ns_peer1 1: ns1 -> 2: ns1
+     *
+     *  add_peer 'ns_peer1', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
+     *     NAMESPACES => ["ns1"]
+     */
+    Set<String> namespaces = new HashSet<>();
+    namespaces.add(NS1);
+    ReplicationPeerConfig rpc4_ns =
+        ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
+            .setReplicateAllUserTables(false).setNamespaces(namespaces).build();
+    admin1.addReplicationPeer(PEER4_NS, rpc4_ns);
+
+    /**
+     * Set ns_peer2 1: ns2:t2_syncup -> 4: ns2:t2_syncup
+     *
+     * add_peer 'ns_peer2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
+     *          NAMESPACES => ["ns2"], TABLE_CFS => { "ns2:t2_syncup" => [] }
+     */
+    Map<TableName, List<String>> tableCFsMap = new HashMap<>();
+    tableCFsMap.put(NS2_TABLE, null);
+    ReplicationPeerConfig rpc4_ns_table =
+        ReplicationPeerConfig.newBuilder().setClusterKey(UTIL4.getClusterKey())
+            .setReplicateAllUserTables(false).setTableCFsMap(tableCFsMap).build();
+    admin1.addReplicationPeer(PEER4_NS_TABLE, rpc4_ns_table);
+  }
+
+  @After
+  @Override
+  public void tearDownBase() throws Exception {
+    super.tearDownBase();
+    TableDescriptor table1 = TableDescriptorBuilder.newBuilder(NS1_TABLE)
+        .setColumnFamily(
+            ColumnFamilyDescriptorBuilder.newBuilder(famName)
+                .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+    TableDescriptor table2 = TableDescriptorBuilder.newBuilder(NS2_TABLE)
+        .setColumnFamily(
+            ColumnFamilyDescriptorBuilder.newBuilder(famName)
+                .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+    Admin admin1 = UTIL1.getAdmin();
+    admin1.disableTable(table1.getTableName());
+    admin1.deleteTable(table1.getTableName());
+    admin1.disableTable(table2.getTableName());
+    admin1.deleteTable(table2.getTableName());
+    admin1.deleteNamespace(NS1);
+    admin1.deleteNamespace(NS2);
+
+    Admin admin2 = UTIL2.getAdmin();
+    admin2.disableTable(table1.getTableName());
+    admin2.deleteTable(table1.getTableName());
+    admin2.disableTable(table2.getTableName());
+    admin2.deleteTable(table2.getTableName());
+    admin2.deleteNamespace(NS1);
+    admin2.deleteNamespace(NS2);
+
+    Admin admin3 = UTIL3.getAdmin();
+    admin3.disableTable(table1.getTableName());
+    admin3.deleteTable(table1.getTableName());
+    admin3.disableTable(table2.getTableName());
+    admin3.deleteTable(table2.getTableName());
+    admin3.deleteNamespace(NS1);
+    admin3.deleteNamespace(NS2);
+
+    Admin admin4 = UTIL4.getAdmin();
+    admin4.disableTable(table1.getTableName());
+    admin4.deleteTable(table1.getTableName());
+    admin4.disableTable(table2.getTableName());
+    admin4.deleteTable(table2.getTableName());
+    admin4.deleteNamespace(NS1);
+    admin4.deleteNamespace(NS2);
+    UTIL1.getAdmin().removeReplicationPeer(PEER4_NS);
+    UTIL1.getAdmin().removeReplicationPeer(PEER4_NS_TABLE);
+  }
+
+  @Test
+  @Override
+  public void testBulkLoadReplicationActiveActive() throws Exception {
+    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
+    Table notPeerTable = UTIL4.getConnection().getTable(TestReplicationBase.tableName);
+    Table ns1Table = UTIL4.getConnection().getTable(NS1_TABLE);
+    Table ns2Table = UTIL4.getConnection().getTable(NS2_TABLE);
+
+    // case1: The ns1 tables will be replicate to cluster4
+    byte[] row = Bytes.toBytes("002_ns_peer");
+    byte[] value = Bytes.toBytes("v2");
+    bulkLoadOnCluster(ns1Table.getName(), row, value, UTIL1);
+    waitForReplication(ns1Table, 1, NB_RETRIES);
+    assertTableHasValue(ns1Table, row, value);
+
+    // case2: The ns2:t2_syncup will be replicate to cluster4
+    // If it's not fix HBASE-23098 the ns_peer1's hfile-refs(zk) will be backlog
+    row = Bytes.toBytes("003_ns_table_peer");
+    value = Bytes.toBytes("v2");
+    bulkLoadOnCluster(ns2Table.getName(), row, value, UTIL1);
+    waitForReplication(ns2Table, 1, NB_RETRIES);
+    assertTableHasValue(ns2Table, row, value);
+
+    // case3: The table test will be replicate to cluster1,cluster2,cluster3
+    //        not replicate to cluster4, because we not set other peer for that tables.
+    row = Bytes.toBytes("001_nopeer");
+    value = Bytes.toBytes("v1");
+    assertBulkLoadConditions(tableName, row, value, UTIL1, peer1TestTable,
+        peer2TestTable, peer3TestTable);
+    assertTableNoValue(notPeerTable, row, value); // 1 -> 4, table is empty
+
+    // Verify hfile-refs for 1:ns_peer1, expect is empty
+    MiniZooKeeperCluster zkCluster = UTIL1.getZkCluster();
+    ZKWatcher watcher = new ZKWatcher(UTIL1.getConfiguration(), "TestZnodeHFiles-refs", null);
+    RecoverableZooKeeper zk = ZKUtil.connect(UTIL1.getConfiguration(), watcher);
+    ZKReplicationQueueStorage replicationQueueStorage =
+        new ZKReplicationQueueStorage(watcher, UTIL1.getConfiguration());
+    Set<String> hfiles = replicationQueueStorage.getAllHFileRefs();
+    assertTrue(hfiles.isEmpty());
+  }
+}
\ No newline at end of file
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index df44486..2e87282 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -132,6 +132,11 @@ public class TestReplicationBase {
 
   protected static void waitForReplication(int expectedRows, int retries)
       throws IOException, InterruptedException {
+    waitForReplication(htable2, expectedRows, retries);
+  }
+
+  protected static void waitForReplication(Table htable2, int expectedRows, int retries)
+      throws IOException, InterruptedException {
     Scan scan;
     for (int i = 0; i < retries; i++) {
       scan = new Scan();