You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/26 01:50:56 UTC

[hbase] branch master updated: HBASE-25590 Bulkload replication HFileRefs cannot be cleared in some cases where set exclude-namespace/exclude-table-cfs (#2969)

This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d0de96  HBASE-25590 Bulkload replication HFileRefs cannot be cleared in some cases where set exclude-namespace/exclude-table-cfs (#2969)
8d0de96 is described below

commit 8d0de969765c0b27991e85b09b52c240321ce881
Author: XinSun <dd...@gmail.com>
AuthorDate: Fri Feb 26 09:50:23 2021 +0800

    HBASE-25590 Bulkload replication HFileRefs cannot be cleared in some cases where set exclude-namespace/exclude-table-cfs (#2969)
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
 .../hbase/replication/ReplicationPeerConfig.java   |  29 +-
 .../replication/TestReplicationPeerConfig.java     | 366 ++++++++++++---------
 .../NamespaceTableCfWALEntryFilter.java            |  84 +----
 .../regionserver/ReplicationSource.java            |  28 +-
 .../regionserver/TestBulkLoadReplication.java      |   8 +-
 .../TestBulkLoadReplicationHFileRefs.java          | 310 +++++++++++++++++
 6 files changed, 557 insertions(+), 268 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 5ca5cef..3b03ae4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.yetus.audience.InterfaceAudience;
 
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
 /**
  * A configuration for the replication peer cluster.
  */
@@ -301,6 +303,19 @@ public class ReplicationPeerConfig {
    * @return true if the table need replicate to the peer cluster
    */
   public boolean needToReplicate(TableName table) {
+    return needToReplicate(table, null);
+  }
+
+  /**
+   * Decide whether the passed family of the table need replicate to the peer cluster according to
+   * this peer config.
+   * @param table name of the table
+   * @param family family name
+   * @return true if (the family of) the table need replicate to the peer cluster.
+   *         If passed family is null, return true if any CFs of the table need replicate;
+   *         If passed family is not null, return true if the passed family need replicate.
+   */
+  public boolean needToReplicate(TableName table, byte[] family) {
     String namespace = table.getNamespaceAsString();
     if (replicateAllUserTables) {
       // replicate all user tables, but filter by exclude namespaces and table-cfs config
@@ -312,9 +327,12 @@ public class ReplicationPeerConfig {
         return true;
       }
       Collection<String> cfs = excludeTableCFsMap.get(table);
-      // if cfs is null or empty then we can make sure that we do not need to replicate this table,
+      // If cfs is null or empty then we can make sure that we do not need to replicate this table,
       // otherwise, we may still need to replicate the table but filter out some families.
-      return cfs != null && !cfs.isEmpty();
+      return cfs != null && !cfs.isEmpty()
+        // If exclude-table-cfs contains passed family then we make sure that we do not need to
+        // replicate this family.
+        && (family == null || !cfs.contains(Bytes.toString(family)));
     } else {
       // Not replicate all user tables, so filter by namespaces and table-cfs config
       if (namespaces == null && tableCFsMap == null) {
@@ -325,7 +343,12 @@ public class ReplicationPeerConfig {
       if (namespaces != null && namespaces.contains(namespace)) {
         return true;
       }
-      return tableCFsMap != null && tableCFsMap.containsKey(table);
+      // If table-cfs contains this table then we can make sure that we need replicate some CFs of
+      // this table. Further we need all CFs if tableCFsMap.get(table) is null or empty.
+      return tableCFsMap != null && tableCFsMap.containsKey(table)
+        && (family == null || CollectionUtils.isEmpty(tableCFsMap.get(table))
+        // If table-cfs must contain passed family then we need to replicate this family.
+        || tableCFsMap.get(table).contains(Bytes.toString(family)));
     }
   }
 }
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
index d67a3f8..ae2d426 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
@@ -17,21 +17,26 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.util.HashMap;
-import java.util.HashSet;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
+
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.BuilderStyleTest;
-import org.junit.Assert;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
 @Category({ClientTests.class, SmallTests.class})
 public class TestReplicationPeerConfig {
 
@@ -39,8 +44,12 @@ public class TestReplicationPeerConfig {
   public static final HBaseClassTestRule CLASS_RULE =
       HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
 
-  private static TableName TABLE_A = TableName.valueOf("replication", "testA");
-  private static TableName TABLE_B = TableName.valueOf("replication", "testB");
+  private static final String NAMESPACE_REPLICATE = "replicate";
+  private static final String NAMESPACE_OTHER = "other";
+  private static final TableName TABLE_A = TableName.valueOf(NAMESPACE_REPLICATE, "testA");
+  private static final TableName TABLE_B = TableName.valueOf(NAMESPACE_REPLICATE, "testB");
+  private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
+  private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
 
   @Test
   public void testClassMethodsAreBuilderStyle() {
@@ -61,193 +70,230 @@ public class TestReplicationPeerConfig {
 
   @Test
   public void testNeedToReplicateWithReplicatingAll() {
-    ReplicationPeerConfig peerConfig;
-    ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
-      new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
-    Map<TableName, List<String>> tableCfs = new HashMap<>();
-    Set<String> namespaces = new HashSet<>();
-
     // 1. replication_all flag is true, no namespaces and table-cfs config
-    builder.setReplicateAllUserTables(true);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
 
     // 2. replicate_all flag is true, and config in excludedTableCfs
-    builder.setExcludeNamespaces(null);
-    // empty map
-    tableCfs = new HashMap<>();
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeTableCFsMap(tableCfs);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
-
-    // table testB
-    tableCfs = new HashMap<>();
-    tableCfs.put(TABLE_B, null);
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeTableCFsMap(tableCfs);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+    // Exclude empty table-cfs map
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(Maps.newHashMap())
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
 
-    // table testA
-    tableCfs = new HashMap<>();
-    tableCfs.put(TABLE_A, null);
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeTableCFsMap(tableCfs);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+    // Exclude table B
+    Map<TableName, List<String>> tableCfs = Maps.newHashMap();
+    tableCfs.put(TABLE_B, null);
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(tableCfs)
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+    assertFalse(peerConfig.needToReplicate(TABLE_B));
 
     // 3. replicate_all flag is true, and config in excludeNamespaces
-    builder.setExcludeTableCFsMap(null);
-    // empty set
-    namespaces = new HashSet<>();
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
-
-    // namespace default
-    namespaces = new HashSet<>();
-    namespaces.add("default");
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
-
-    // namespace replication
-    namespaces = new HashSet<>();
-    namespaces.add("replication");
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+    // Exclude empty namespace set
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeNamespaces(Sets.newHashSet())
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+    // Exclude namespace other
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+    // Exclude namespace replication
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
 
     // 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
     // Namespaces config doesn't conflict with table-cfs config
-    namespaces = new HashSet<>();
-    tableCfs = new HashMap<>();
-    namespaces.add("replication");
+    tableCfs = Maps.newHashMap();
     tableCfs.put(TABLE_A, null);
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeTableCFsMap(tableCfs);
-    builder.setExcludeNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+      .setExcludeTableCFsMap(tableCfs)
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
 
     // Namespaces config conflicts with table-cfs config
-    namespaces = new HashSet<>();
-    tableCfs = new HashMap<>();
-    namespaces.add("default");
+    tableCfs = Maps.newHashMap();
     tableCfs.put(TABLE_A, null);
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeTableCFsMap(tableCfs);
-    builder.setExcludeNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
-
-    namespaces = new HashSet<>();
-    tableCfs = new HashMap<>();
-    namespaces.add("replication");
-    tableCfs.put(TABLE_B, null);
-    builder.setReplicateAllUserTables(true);
-    builder.setExcludeTableCFsMap(tableCfs);
-    builder.setExcludeNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(tableCfs)
+      .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
+    assertTrue(peerConfig.needToReplicate(TABLE_B));
 
+    tableCfs = Maps.newHashMap();
+    tableCfs.put(TABLE_B, null);
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(tableCfs)
+      .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
+    assertFalse(peerConfig.needToReplicate(TABLE_B));
   }
 
   @Test
   public void testNeedToReplicateWithoutReplicatingAll() {
     ReplicationPeerConfig peerConfig;
-    ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
-      new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
-    Map<TableName, List<String>> tableCfs = new HashMap<>();
-    Set<String> namespaces = new HashSet<>();
+    Map<TableName, List<String>> tableCfs;
 
     // 1. replication_all flag is false, no namespaces and table-cfs config
-    builder.setReplicateAllUserTables(false);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
 
     // 2. replicate_all flag is false, and only config table-cfs in peer
-    // empty map
-    builder.setReplicateAllUserTables(false);
-    builder.setTableCFsMap(tableCfs);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
-
-    // table testB
-    tableCfs = new HashMap<>();
-    tableCfs.put(TABLE_B, null);
-    builder.setReplicateAllUserTables(false);
-    builder.setTableCFsMap(tableCfs);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+    // Set empty table-cfs map
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setTableCFsMap(Maps.newHashMap())
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
 
-    // table testA
-    tableCfs = new HashMap<>();
-    tableCfs.put(TABLE_A, null);
-    builder.setReplicateAllUserTables(false);
-    builder.setTableCFsMap(tableCfs);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+    // Set table B
+    tableCfs = Maps.newHashMap();
+    tableCfs.put(TABLE_B, null);
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setTableCFsMap(tableCfs)
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
+    assertTrue(peerConfig.needToReplicate(TABLE_B));
 
     // 3. replication_all flag is false, and only config namespace in peer
-    builder.setTableCFsMap(null);
-    // empty set
-    builder.setReplicateAllUserTables(false);
-    builder.setNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
-
-    // namespace default
-    namespaces = new HashSet<>();
-    namespaces.add("default");
-    builder.setReplicateAllUserTables(false);
-    builder.setNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
-
-    // namespace replication
-    namespaces = new HashSet<>();
-    namespaces.add("replication");
-    builder.setReplicateAllUserTables(false);
-    builder.setNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+    // Set empty namespace set
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setNamespaces(Sets.newHashSet())
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+    // Set namespace other
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+    // Set namespace replication
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
 
     // 4. replicate_all flag is false, and config namespaces and table-cfs both
     // Namespaces config doesn't conflict with table-cfs config
-    namespaces = new HashSet<>();
-    tableCfs = new HashMap<>();
-    namespaces.add("replication");
+    tableCfs = Maps.newHashMap();
     tableCfs.put(TABLE_A, null);
-    builder.setReplicateAllUserTables(false);
-    builder.setTableCFsMap(tableCfs);
-    builder.setNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setTableCFsMap(tableCfs)
+      .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
 
     // Namespaces config conflicts with table-cfs config
-    namespaces = new HashSet<>();
-    tableCfs = new HashMap<>();
-    namespaces.add("default");
+    tableCfs = Maps.newHashMap();
     tableCfs.put(TABLE_A, null);
-    builder.setReplicateAllUserTables(false);
-    builder.setTableCFsMap(tableCfs);
-    builder.setNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
-
-    namespaces = new HashSet<>();
-    tableCfs = new HashMap<>();
-    namespaces.add("replication");
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setTableCFsMap(tableCfs)
+      .setNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+    tableCfs = Maps.newHashMap();
     tableCfs.put(TABLE_B, null);
-    builder.setReplicateAllUserTables(false);
-    builder.setTableCFsMap(tableCfs);
-    builder.setNamespaces(namespaces);
-    peerConfig = builder.build();
-    Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+      .setTableCFsMap(tableCfs)
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+  }
+
+  @Test
+  public void testNeedToReplicateCFWithReplicatingAll() {
+    Map<TableName, List<String>> excludeTableCfs = Maps.newHashMap();
+    excludeTableCfs.put(TABLE_A, null);
+    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(excludeTableCfs)
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
+    assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+    assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+
+    excludeTableCfs = Maps.newHashMap();
+    excludeTableCfs.put(TABLE_A, Lists.newArrayList());
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(excludeTableCfs)
+      .build();
+    assertFalse(peerConfig.needToReplicate(TABLE_A));
+    assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+    assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+
+    excludeTableCfs = Maps.newHashMap();
+    excludeTableCfs.put(TABLE_A, Lists.newArrayList(Bytes.toString(FAMILY1)));
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(excludeTableCfs)
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+    assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+    assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+  }
+
+  @Test
+  public void testNeedToReplicateCFWithoutReplicatingAll() {
+    Map<TableName, List<String>> tableCfs = Maps.newHashMap();
+    tableCfs.put(TABLE_A, null);
+    ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setTableCFsMap(tableCfs)
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+    assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+    assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+
+    tableCfs = Maps.newHashMap();
+    tableCfs.put(TABLE_A, Lists.newArrayList());
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setTableCFsMap(tableCfs)
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+    assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+    assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+
+    tableCfs = Maps.newHashMap();
+    tableCfs.put(TABLE_A, Lists.newArrayList(Bytes.toString(FAMILY1)));
+    peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+      .setReplicateAllUserTables(false)
+      .setTableCFsMap(tableCfs)
+      .build();
+    assertTrue(peerConfig.needToReplicate(TABLE_A));
+    assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+    assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 58705f0..4fe04cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -18,27 +18,17 @@
 
 package org.apache.hadoop.hbase.replication;
 
-import java.util.List;
-import java.util.Map;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
 
 /**
- * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
- * exclude namespaces config, and exclude table-cfs config.
+ * Filter a WAL Entry by the peer config according to the table and family which it belongs to.
  *
- * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But
- * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster.
- * Note: set a exclude namespace means that all tables in this namespace can't be replicated.
- *
- * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
- * But you can set namespaces or table-cfs which will be replicated to peer cluster.
- * Note: set a namespace means that all tables in this namespace will be replicated.
+ * @see ReplicationPeerConfig#needToReplicate(TableName, byte[])
  */
 @InterfaceAudience.Private
 public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
@@ -62,72 +52,12 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
   @Override
   public Cell filterCell(final Entry entry, Cell cell) {
     ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
-    if (peerConfig.replicateAllUserTables()) {
-      // replicate all user tables, but filter by exclude table-cfs config
-      final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap();
-      if (excludeTableCfs == null) {
-        return cell;
-      }
-
-      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
-        cell = bulkLoadFilter.filterCell(cell,
-          fam -> filterByExcludeTableCfs(entry.getKey().getTableName(), Bytes.toString(fam),
-            excludeTableCfs));
-      } else {
-        if (filterByExcludeTableCfs(entry.getKey().getTableName(),
-          Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
-          excludeTableCfs)) {
-          return null;
-        }
-      }
-
-      return cell;
+    TableName tableName = entry.getKey().getTableName();
+    if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+      // If the cell is about BULKLOAD event, unpack and filter it by BulkLoadCellFilter.
+      return bulkLoadFilter.filterCell(cell, fam -> !peerConfig.needToReplicate(tableName, fam));
     } else {
-      // not replicate all user tables, so filter by table-cfs config
-      final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
-      if (tableCfs == null) {
-        return cell;
-      }
-
-      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
-        cell = bulkLoadFilter.filterCell(cell,
-          fam -> filterByTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), tableCfs));
-      } else {
-        if (filterByTableCfs(entry.getKey().getTableName(),
-          Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
-          tableCfs)) {
-          return null;
-        }
-      }
-
-      return cell;
-    }
-  }
-
-  private boolean filterByExcludeTableCfs(TableName tableName, String family,
-      Map<TableName, List<String>> excludeTableCfs) {
-    List<String> excludeCfs = excludeTableCfs.get(tableName);
-    if (excludeCfs != null) {
-      // empty cfs means all cfs of this table are excluded
-      if (excludeCfs.isEmpty()) {
-        return true;
-      }
-      // ignore(remove) kv if its cf is in the exclude cfs list
-      if (excludeCfs.contains(family)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  private boolean filterByTableCfs(TableName tableName, String family,
-      Map<TableName, List<String>> tableCfs) {
-    List<String> cfs = tableCfs.get(tableName);
-    // ignore(remove) kv if its cf isn't in the replicable cf list
-    // (empty cfs means all cfs of this table are replicable)
-    if (cfs != null && !cfs.contains(family)) {
-      return true;
+      return peerConfig.needToReplicate(tableName, CellUtil.cloneFamily(cell)) ? cell : null;
     }
-    return false;
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 95bd686..f3fda67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -26,7 +26,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
@@ -269,31 +268,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
   public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
       throws ReplicationException {
     String peerId = replicationPeer.getId();
-    Set<String> namespaces = replicationPeer.getNamespaces();
-    Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
-    if (tableCFMap != null) { // All peers with TableCFs
-      List<String> tableCfs = tableCFMap.get(tableName);
-      if (tableCFMap.containsKey(tableName)
-          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
-        this.queueStorage.addHFileRefs(peerId, pairs);
-        metrics.incrSizeOfHFileRefsQueue(pairs.size());
-      } else {
-        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
-            tableName, Bytes.toString(family), peerId);
-      }
-    } else if (namespaces != null) { // Only for set NAMESPACES peers
-      if (namespaces.contains(tableName.getNamespaceAsString())) {
-        this.queueStorage.addHFileRefs(peerId, pairs);
-        metrics.incrSizeOfHFileRefsQueue(pairs.size());
-      } else {
-        LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
-            tableName, Bytes.toString(family), peerId);
-      }
-    } else {
-      // user has explicitly not defined any table cfs for replication, means replicate all the
-      // data
+    if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
       this.queueStorage.addHFileRefs(peerId, pairs);
       metrics.incrSizeOfHFileRefsQueue(pairs.size());
+    } else {
+      LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+        tableName, Bytes.toString(family), peerId);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 9c58c7a..07576b3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -170,9 +170,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
     //adds cluster2 as a remote peer on cluster3
     UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
-    setupCoprocessor(UTIL1, "cluster1");
-    setupCoprocessor(UTIL2, "cluster2");
-    setupCoprocessor(UTIL3, "cluster3");
+    setupCoprocessor(UTIL1);
+    setupCoprocessor(UTIL2);
+    setupCoprocessor(UTIL3);
     BULK_LOADS_COUNT = new AtomicInteger(0);
   }
 
@@ -181,7 +181,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
       .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
   }
 
-  private void setupCoprocessor(HBaseTestingUtility cluster, String name){
+  private void setupCoprocessor(HBaseTestingUtility cluster){
     cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
       try {
         TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java
new file mode 100644
index 0000000..134ea47
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java
@@ -0,0 +1,310 @@
+/*
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category({ ReplicationTests.class, SmallTests.class})
+public class TestBulkLoadReplicationHFileRefs extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadReplicationHFileRefs.class);
+
+  private static final String PEER1_CLUSTER_ID = "peer1";
+  private static final String PEER2_CLUSTER_ID = "peer2";
+
+  private static final String REPLICATE_NAMESPACE = "replicate_ns";
+  private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns";
+  private static final TableName REPLICATE_TABLE =
+    TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table");
+  private static final TableName NO_REPLICATE_TABLE =
+    TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table");
+  private static final byte[] CF_A = Bytes.toBytes("cfa");
+  private static final byte[] CF_B = Bytes.toBytes("cfb");
+
+  private byte[] row = Bytes.toBytes("r1");
+  private byte[] qualifier = Bytes.toBytes("q1");
+  private byte[] value = Bytes.toBytes("v1");
+
+  @ClassRule
+  public static TemporaryFolder testFolder = new TemporaryFolder();
+
+  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
+
+  private static Admin admin1;
+  private static Admin admin2;
+
+  private static ReplicationQueueStorage queueStorage;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
+    setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
+    TestReplicationBase.setUpBeforeClass();
+    admin1 = UTIL1.getConnection().getAdmin();
+    admin2 = UTIL2.getConnection().getAdmin();
+
+    queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(),
+      UTIL1.getConfiguration());
+
+    admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
+    admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
+    admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
+    admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
+  }
+
+  protected static void setupBulkLoadConfigsForCluster(Configuration config,
+    String clusterReplicationId) throws Exception {
+    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
+    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
+    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
+    config.writeXml(new FileOutputStream(sourceConfigFile));
+    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
+      admin1.removeReplicationPeer(peer.getPeerId());
+    }
+  }
+
+  @After
+  public void teardown() throws Exception {
+    for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
+      admin1.removeReplicationPeer(peer.getPeerId());
+    }
+    for (TableName tableName : admin1.listTableNames()) {
+      UTIL1.deleteTable(tableName);
+    }
+    for (TableName tableName : admin2.listTableNames()) {
+      UTIL2.deleteTable(tableName);
+    }
+  }
+
+  @Test
+  public void testWhenExcludeCF() throws Exception {
+    // Create table in source and remote clusters.
+    createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B);
+    // Add peer, setReplicateAllUserTables true, but exclude CF_B.
+    Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
+    excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B)));
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey(UTIL2.getClusterKey())
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(excludeTableCFs)
+      .build();
+    admin1.addReplicationPeer(PEER_ID2, peerConfig);
+    Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
+    Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
+    Assert.assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B));
+
+    assertEquals(0, queueStorage.getAllHFileRefs().size());
+
+    // Bulk load data into the CF that is not replicated.
+    bulkLoadOnCluster(REPLICATE_TABLE, CF_B);
+    Threads.sleep(1000);
+
+    // Cannot get data from remote cluster
+    Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE);
+    Result result = table2.get(new Get(row));
+    assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier)));
+    // The extra HFile is never added to the HFileRefs
+    assertEquals(0, queueStorage.getAllHFileRefs().size());
+  }
+
+  @Test
+  public void testWhenExcludeTable() throws Exception {
+    // Create 2 tables in source and remote clusters.
+    createTableOnClusters(REPLICATE_TABLE, CF_A);
+    createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
+
+    // Add peer, setReplicateAllUserTables true, but exclude one table.
+    Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
+    excludeTableCFs.put(NO_REPLICATE_TABLE, null);
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey(UTIL2.getClusterKey())
+      .setReplicateAllUserTables(true)
+      .setExcludeTableCFsMap(excludeTableCFs)
+      .build();
+    admin1.addReplicationPeer(PEER_ID2, peerConfig);
+    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
+    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
+    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
+    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
+
+    assertEquals(0, queueStorage.getAllHFileRefs().size());
+
+    // Bulk load data into the table that is not replicated.
+    bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
+    Threads.sleep(1000);
+
+    // Cannot get data from remote cluster
+    Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
+    Result result = table2.get(new Get(row));
+    assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
+
+    // The extra HFile is never added to the HFileRefs
+    assertEquals(0, queueStorage.getAllHFileRefs().size());
+  }
+
+  @Test
+  public void testWhenExcludeNamespace() throws Exception {
+    // Create 2 tables in source and remote clusters.
+    createTableOnClusters(REPLICATE_TABLE, CF_A);
+    createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
+
+    // Add peer, setReplicateAllUserTables true, but exclude one namespace.
+    ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+      .setClusterKey(UTIL2.getClusterKey())
+      .setReplicateAllUserTables(true)
+      .setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE))
+      .build();
+    admin1.addReplicationPeer(PEER_ID2, peerConfig);
+    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
+    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
+    assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
+    assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
+
+    assertEquals(0, queueStorage.getAllHFileRefs().size());
+
+    // Bulk load data into the table of the namespace that is not replicated.
+    byte[] row = Bytes.toBytes("001");
+    byte[] value = Bytes.toBytes("v1");
+    bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
+    Threads.sleep(1000);
+
+    // Cannot get data from remote cluster
+    Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
+    Result result = table2.get(new Get(row));
+    assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
+
+    // The extra HFile is never added to the HFileRefs
+    assertEquals(0, queueStorage.getAllHFileRefs().size());
+  }
+
+  protected void bulkLoadOnCluster(TableName tableName, byte[] family)
+    throws Exception {
+    String bulkLoadFilePath = createHFileForFamilies(family);
+    copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster());
+    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration());
+    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
+  }
+
+  private String createHFileForFamilies(byte[] family) throws IOException {
+    CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+    cellBuilder.setRow(row)
+      .setFamily(family)
+      .setQualifier(qualifier)
+      .setValue(value)
+      .setType(Cell.Type.Put);
+
+    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration());
+    File hFileLocation = testFolder.newFile();
+    FSDataOutputStream out =
+      new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
+    try {
+      hFileFactory.withOutputStream(out);
+      hFileFactory.withFileContext(new HFileContextBuilder().build());
+      HFile.Writer writer = hFileFactory.create();
+      try {
+        writer.append(new KeyValue(cellBuilder.build()));
+      } finally {
+        writer.close();
+      }
+    } finally {
+      out.close();
+    }
+    return hFileLocation.getAbsoluteFile().getAbsolutePath();
+  }
+
+  private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster)
+    throws Exception {
+    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family));
+    cluster.getFileSystem().mkdirs(bulkLoadDir);
+    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
+  }
+
+  private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException {
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    for (byte[] cf : cfs) {
+      builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf)
+        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
+    }
+    TableDescriptor td = builder.build();
+    admin1.createTable(td);
+    admin2.createTable(td);
+  }
+}