You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2019/12/04 01:39:48 UTC
[hbase] branch branch-2.2 updated: HBASE-23345 Table need to
replication unless all of cfs are excluded
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 7ccb6f7 HBASE-23345 Table need to replication unless all of cfs are excluded
7ccb6f7 is described below
commit 7ccb6f7141e17cb0712d40a98d5bea9c11083242
Author: ddupg <dd...@gmail.com>
AuthorDate: Thu Nov 28 18:57:56 2019 +0800
HBASE-23345 Table need to replication unless all of cfs are excluded
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../hbase/replication/ReplicationPeerConfig.java | 25 ++-
.../replication/TestReplicationPeerConfig.java | 202 ++++++++++++++++++
.../hadoop/hbase/replication/ReplicationUtils.java | 38 ----
.../hbase/replication/TestReplicationUtil.java | 235 ---------------------
.../master/replication/ModifyPeerProcedure.java | 7 +-
.../master/replication/ReplicationPeerManager.java | 2 +-
.../replication/UpdatePeerConfigProcedure.java | 6 +-
.../NamespaceTableCfWALEntryFilter.java | 2 +-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 7 +-
.../TestReplicationWALEntryFilters.java | 116 +++++-----
10 files changed, 278 insertions(+), 362 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index e0d9a4c..7c0f115 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -366,22 +366,31 @@ public class ReplicationPeerConfig {
* @return true if the table need replicate to the peer cluster
*/
public boolean needToReplicate(TableName table) {
+ String namespace = table.getNamespaceAsString();
if (replicateAllUserTables) {
- if (excludeNamespaces != null && excludeNamespaces.contains(table.getNamespaceAsString())) {
+ // replicate all user tables, but filter by exclude namespaces and table-cfs config
+ if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
return false;
}
- if (excludeTableCFsMap != null && excludeTableCFsMap.containsKey(table)) {
- return false;
+ // trap here, must check existence first since HashMap allows null value.
+ if (excludeTableCFsMap == null || !excludeTableCFsMap.containsKey(table)) {
+ return true;
}
- return true;
+ Collection<String> cfs = excludeTableCFsMap.get(table);
+ // if cfs is null or empty then we can make sure that we do not need to replicate this table,
+ // otherwise, we may still need to replicate the table but filter out some families.
+ return cfs != null && !cfs.isEmpty();
} else {
- if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
- return true;
+ // Not replicate all user tables, so filter by namespaces and table-cfs config
+ if (namespaces == null && tableCFsMap == null) {
+ return false;
}
- if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
+ // First filter by namespaces config
+ // If table's namespace in peer config, all the tables data are applicable for replication
+ if (namespaces != null && namespaces.contains(namespace)) {
return true;
}
- return false;
+ return tableCFsMap != null && tableCFsMap.containsKey(table);
}
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
index 881ef45..d67a3f8 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
@@ -17,10 +17,17 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BuilderStyleTest;
+import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -32,6 +39,9 @@ public class TestReplicationPeerConfig {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
+ private static TableName TABLE_A = TableName.valueOf("replication", "testA");
+ private static TableName TABLE_B = TableName.valueOf("replication", "testB");
+
@Test
public void testClassMethodsAreBuilderStyle() {
/* ReplicationPeerConfig should have a builder style setup where setXXX/addXXX methods
@@ -48,4 +58,196 @@ public class TestReplicationPeerConfig {
BuilderStyleTest.assertClassesAreBuilderStyle(ReplicationPeerConfig.class);
}
+
+ @Test
+ public void testNeedToReplicateWithReplicatingAll() {
+ ReplicationPeerConfig peerConfig;
+ ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
+ new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
+ Map<TableName, List<String>> tableCfs = new HashMap<>();
+ Set<String> namespaces = new HashSet<>();
+
+ // 1. replication_all flag is true, no namespaces and table-cfs config
+ builder.setReplicateAllUserTables(true);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // 2. replicate_all flag is true, and config in excludedTableCfs
+ builder.setExcludeNamespaces(null);
+ // empty map
+ tableCfs = new HashMap<>();
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // table testB
+ tableCfs = new HashMap<>();
+ tableCfs.put(TABLE_B, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // table testA
+ tableCfs = new HashMap<>();
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // 3. replicate_all flag is true, and config in excludeNamespaces
+ builder.setExcludeTableCFsMap(null);
+ // empty set
+ namespaces = new HashSet<>();
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // namespace default
+ namespaces = new HashSet<>();
+ namespaces.add("default");
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // namespace replication
+ namespaces = new HashSet<>();
+ namespaces.add("replication");
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
+ // Namespaces config doesn't conflict with table-cfs config
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("replication");
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // Namespaces config conflicts with table-cfs config
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("default");
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("replication");
+ tableCfs.put(TABLE_B, null);
+ builder.setReplicateAllUserTables(true);
+ builder.setExcludeTableCFsMap(tableCfs);
+ builder.setExcludeNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ }
+
+ @Test
+ public void testNeedToReplicateWithoutReplicatingAll() {
+ ReplicationPeerConfig peerConfig;
+ ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
+ new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
+ Map<TableName, List<String>> tableCfs = new HashMap<>();
+ Set<String> namespaces = new HashSet<>();
+
+ // 1. replication_all flag is false, no namespaces and table-cfs config
+ builder.setReplicateAllUserTables(false);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // 2. replicate_all flag is false, and only config table-cfs in peer
+ // empty map
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // table testB
+ tableCfs = new HashMap<>();
+ tableCfs.put(TABLE_B, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // table testA
+ tableCfs = new HashMap<>();
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // 3. replication_all flag is false, and only config namespace in peer
+ builder.setTableCFsMap(null);
+ // empty set
+ builder.setReplicateAllUserTables(false);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // namespace default
+ namespaces = new HashSet<>();
+ namespaces.add("default");
+ builder.setReplicateAllUserTables(false);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // namespace replication
+ namespaces = new HashSet<>();
+ namespaces.add("replication");
+ builder.setReplicateAllUserTables(false);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // 4. replicate_all flag is false, and config namespaces and table-cfs both
+ // Namespaces config doesn't conflict with table-cfs config
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("replication");
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // Namespaces config conflicts with table-cfs config
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("default");
+ tableCfs.put(TABLE_A, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ namespaces = new HashSet<>();
+ tableCfs = new HashMap<>();
+ namespaces.add("replication");
+ tableCfs.put(TABLE_B, null);
+ builder.setReplicateAllUserTables(false);
+ builder.setTableCFsMap(tableCfs);
+ builder.setNamespaces(namespaces);
+ peerConfig = builder.build();
+ Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+ }
}
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index b2b87a4..7cfb9d4 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -135,44 +135,6 @@ public final class ReplicationUtils {
}
/**
- * Returns whether we should replicate the given table.
- */
- public static boolean contains(ReplicationPeerConfig peerConfig, TableName tableName) {
- String namespace = tableName.getNamespaceAsString();
- if (peerConfig.replicateAllUserTables()) {
- // replicate all user tables, but filter by exclude namespaces and table-cfs config
- Set<String> excludeNamespaces = peerConfig.getExcludeNamespaces();
- if (excludeNamespaces != null && excludeNamespaces.contains(namespace)) {
- return false;
- }
- Map<TableName, List<String>> excludedTableCFs = peerConfig.getExcludeTableCFsMap();
- // trap here, must check existence first since HashMap allows null value.
- if (excludedTableCFs == null || !excludedTableCFs.containsKey(tableName)) {
- return true;
- }
- List<String> cfs = excludedTableCFs.get(tableName);
- // if cfs is null or empty then we can make sure that we do not need to replicate this table,
- // otherwise, we may still need to replicate the table but filter out some families.
- return cfs != null && !cfs.isEmpty();
- } else {
- // Not replicate all user tables, so filter by namespaces and table-cfs config
- Set<String> namespaces = peerConfig.getNamespaces();
- Map<TableName, List<String>> tableCFs = peerConfig.getTableCFsMap();
-
- if (namespaces == null && tableCFs == null) {
- return false;
- }
-
- // First filter by namespaces config
- // If table's namespace in peer config, all the tables data are applicable for replication
- if (namespaces != null && namespaces.contains(namespace)) {
- return true;
- }
- return tableCFs != null && tableCFs.containsKey(tableName);
- }
- }
-
- /**
* Get the adaptive timeout value when performing a retry
*/
public static int getAdaptiveTimeout(final int initialValue, final int retries) {
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java
deleted file mode 100644
index f8543fe..0000000
--- a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationUtil.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.junit.Assert;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, SmallTests.class })
-public class TestReplicationUtil {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationUtil.class);
-
- private static TableName TABLE_A = TableName.valueOf("replication", "testA");
- private static TableName TABLE_B = TableName.valueOf("replication", "testB");
-
- @Test
- public void testContainsWithReplicatingAll() {
- ReplicationPeerConfig peerConfig;
- ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
- new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
- Map<TableName, List<String>> tableCfs = new HashMap<>();
- Set<String> namespaces = new HashSet<>();
-
- // 1. replication_all flag is true, no namespaces and table-cfs config
- builder.setReplicateAllUserTables(true);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 2. replicate_all flag is true, and config in excludedTableCfs
- builder.setExcludeNamespaces(null);
- // empty map
- tableCfs = new HashMap<>();
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // table testB
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // table testA
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 3. replicate_all flag is true, and config in excludeNamespaces
- builder.setExcludeTableCFsMap(null);
- // empty set
- namespaces = new HashSet<>();
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // namespace default
- namespaces = new HashSet<>();
- namespaces.add("default");
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // namespace replication
- namespaces = new HashSet<>();
- namespaces.add("replication");
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
- // Namespaces config doesn't conflict with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // Namespaces config conflicts with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("default");
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- }
-
- @Test
- public void testContainsWithoutReplicatingAll() {
- ReplicationPeerConfig peerConfig;
- ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
- new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
- Map<TableName, List<String>> tableCfs = new HashMap<>();
- Set<String> namespaces = new HashSet<>();
-
- // 1. replication_all flag is false, no namespaces and table-cfs config
- builder.setReplicateAllUserTables(false);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 2. replicate_all flag is false, and only config table-cfs in peer
- // empty map
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // table testB
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // table testA
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 3. replication_all flag is false, and only config namespace in peer
- builder.setTableCFsMap(null);
- // empty set
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // namespace default
- namespaces = new HashSet<>();
- namespaces.add("default");
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // namespace replication
- namespaces = new HashSet<>();
- namespaces.add("replication");
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // 4. replicate_all flag is false, and config namespaces and table-cfs both
- // Namespaces config doesn't conflict with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- // Namespaces config conflicts with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("default");
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
-
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(ReplicationUtils.contains(peerConfig, TABLE_A));
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
index 0a6eb2a..fc792cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ModifyPeerProcedure.java
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.yetus.audience.InterfaceAudience;
@@ -168,11 +167,11 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
continue;
}
TableName tn = td.getTableName();
- if (!ReplicationUtils.contains(peerConfig, tn)) {
+ if (!peerConfig.needToReplicate(tn)) {
continue;
}
if (oldPeerConfig != null && oldPeerConfig.isSerial() &&
- ReplicationUtils.contains(oldPeerConfig, tn)) {
+ oldPeerConfig.needToReplicate(tn)) {
continue;
}
if (needReopen(tsm, tn)) {
@@ -206,7 +205,7 @@ public abstract class ModifyPeerProcedure extends AbstractPeerProcedure<PeerModi
continue;
}
TableName tn = td.getTableName();
- if (!ReplicationUtils.contains(peerConfig, tn)) {
+ if (!peerConfig.needToReplicate(tn)) {
continue;
}
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
index 17bbb82..29a8a1b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/ReplicationPeerManager.java
@@ -359,7 +359,7 @@ public class ReplicationPeerManager {
public List<String> getSerialPeerIdsBelongsTo(TableName tableName) {
return peers.values().stream().filter(p -> p.getPeerConfig().isSerial())
- .filter(p -> ReplicationUtils.contains(p.getPeerConfig(), tableName)).map(p -> p.getPeerId())
+ .filter(p -> p.getPeerConfig().needToReplicate(tableName)).map(p -> p.getPeerId())
.collect(Collectors.toList());
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
index 41e740f..188921a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/replication/UpdatePeerConfigProcedure.java
@@ -128,15 +128,15 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
continue;
}
TableName tn = td.getTableName();
- if (ReplicationUtils.contains(oldPeerConfig, tn)) {
- if (!ReplicationUtils.contains(peerConfig, tn)) {
+ if (oldPeerConfig.needToReplicate(tn)) {
+ if (!peerConfig.needToReplicate(tn)) {
// removed from peer config
for (String encodedRegionName : MetaTableAccessor
.getTableEncodedRegionNamesForSerialReplication(conn, tn)) {
addToList(encodedRegionNames, encodedRegionName, queueStorage);
}
}
- } else if (ReplicationUtils.contains(peerConfig, tn)) {
+ } else if (peerConfig.needToReplicate(tn)) {
// newly added to peer config
setLastPushedSequenceIdForTable(env, tn, lastSeqIds);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 3a3200a..58705f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -52,7 +52,7 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
@Override
public Entry filter(Entry entry) {
- if (ReplicationUtils.contains(this.peer.getPeerConfig(), entry.getKey().getTableName())) {
+ if (this.peer.getPeerConfig().needToReplicate(entry.getKey().getTableName())) {
return entry;
} else {
return null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 8a29a5a..a6ece21 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -116,7 +116,6 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
@@ -3927,8 +3926,8 @@ public class HBaseFsck extends Configured implements Closeable {
List<ReplicationPeerDescription> peerDescriptions = admin.listReplicationPeers();
if (peerDescriptions != null && peerDescriptions.size() > 0) {
List<String> peers = peerDescriptions.stream()
- .filter(peerConfig -> ReplicationUtils.contains(peerConfig.getPeerConfig(),
- cleanReplicationBarrierTable))
+ .filter(peerConfig -> peerConfig.getPeerConfig()
+ .needToReplicate(cleanReplicationBarrierTable))
.map(peerConfig -> peerConfig.getPeerId()).collect(Collectors.toList());
try {
List<String> batch = new ArrayList<>();
@@ -3997,4 +3996,4 @@ public class HBaseFsck extends Configured implements Closeable {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index f2c5e50..b0c2aa7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -213,13 +213,11 @@ public class TestReplicationWALEntryFilters {
@Test
public void testNamespaceTableCfWALEntryFilter() {
ReplicationPeer peer = mock(ReplicationPeer.class);
- ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
+ ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
// 1. replicate_all flag is false, no namespaces and table-cfs config
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(null);
- when(peerConfig.getTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(null).setTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter =
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
@@ -229,9 +227,8 @@ public class TestReplicationWALEntryFilters {
// empty map
userEntry = createEntry(null, a, b, c);
Map<TableName, List<String>> tableCfs = new HashMap<>();
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -239,9 +236,8 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("bar"), null);
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -249,9 +245,8 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a), filter.filter(userEntry));
@@ -259,9 +254,8 @@ public class TestReplicationWALEntryFilters {
userEntry = createEntry(null, a, b, c, d);
tableCfs = new HashMap<>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,c), filter.filter(userEntry));
@@ -269,19 +263,17 @@ public class TestReplicationWALEntryFilters {
when(peer.getTableCFs()).thenReturn(null);
// empty set
Set<String> namespaces = new HashSet<>();
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peerConfig.getTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
+ .setTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
// namespace default
namespaces.add("default");
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a,b,c), filter.filter(userEntry));
@@ -289,9 +281,8 @@ public class TestReplicationWALEntryFilters {
// namespace ns1
namespaces = new HashSet<>();
namespaces.add("ns1");
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -302,10 +293,9 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
+ .setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, c), filter.filter(userEntry));
@@ -314,10 +304,9 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<>();
namespaces.add("default");
tableCfs.put(TableName.valueOf("ns1:foo"), Lists.newArrayList("a", "c"));
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
+ .setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -326,10 +315,9 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("bar"), null);
- when(peerConfig.replicateAllUserTables()).thenReturn(false);
- when(peerConfig.getNamespaces()).thenReturn(namespaces);
- when(peerConfig.getTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(false).setNamespaces(namespaces)
+ .setTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -338,14 +326,14 @@ public class TestReplicationWALEntryFilters {
@Test
public void testNamespaceTableCfWALEntryFilter2() {
ReplicationPeer peer = mock(ReplicationPeer.class);
- ReplicationPeerConfig peerConfig = mock(ReplicationPeerConfig.class);
+ ReplicationPeerConfigBuilder peerConfigBuilder = ReplicationPeerConfig.newBuilder();
// 1. replicate_all flag is true
// and no exclude namespaces and no exclude table-cfs config
- when(peerConfig.replicateAllUserTables()).thenReturn(true);
- when(peerConfig.getExcludeNamespaces()).thenReturn(null);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setReplicateAllUserTables(true)
+ .setExcludeNamespaces(null)
+ .setExcludeTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
Entry userEntry = createEntry(null, a, b, c);
ChainWALEntryFilter filter =
new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
@@ -354,18 +342,16 @@ public class TestReplicationWALEntryFilters {
// 2. replicate_all flag is true, and only config exclude namespaces
// empty set
Set<String> namespaces = new HashSet<String>();
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
// exclude namespace default
namespaces.add("default");
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));
@@ -373,9 +359,8 @@ public class TestReplicationWALEntryFilters {
// exclude namespace ns1
namespaces = new HashSet<String>();
namespaces.add("ns1");
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(null);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(null);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -383,9 +368,8 @@ public class TestReplicationWALEntryFilters {
// 3. replicate_all flag is true, and only config exclude table-cfs
// empty table-cfs map
Map<TableName, List<String>> tableCfs = new HashMap<TableName, List<String>>();
- when(peerConfig.getExcludeNamespaces()).thenReturn(null);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -393,9 +377,8 @@ public class TestReplicationWALEntryFilters {
// exclude table bar
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("bar"), null);
- when(peerConfig.getExcludeNamespaces()).thenReturn(null);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, a, b, c), filter.filter(userEntry));
@@ -403,9 +386,8 @@ public class TestReplicationWALEntryFilters {
// exclude table foo:a
tableCfs = new HashMap<TableName, List<String>>();
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a"));
- when(peerConfig.getExcludeNamespaces()).thenReturn(null);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(null).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, b, c), filter.filter(userEntry));
@@ -416,9 +398,8 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("ns1");
tableCfs.put(TableName.valueOf("foo"), Lists.newArrayList("a", "c"));
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(createEntry(null, b), filter.filter(userEntry));
@@ -428,9 +409,8 @@ public class TestReplicationWALEntryFilters {
tableCfs = new HashMap<TableName, List<String>>();
namespaces.add("default");
tableCfs.put(TableName.valueOf("ns1:bar"), new ArrayList<String>());
- when(peerConfig.getExcludeNamespaces()).thenReturn(namespaces);
- when(peerConfig.getExcludeTableCFsMap()).thenReturn(tableCfs);
- when(peer.getPeerConfig()).thenReturn(peerConfig);
+ peerConfigBuilder.setExcludeNamespaces(namespaces).setExcludeTableCFsMap(tableCfs);
+ when(peer.getPeerConfig()).thenReturn(peerConfigBuilder.build());
userEntry = createEntry(null, a, b, c);
filter = new ChainWALEntryFilter(new NamespaceTableCfWALEntryFilter(peer));
assertEquals(null, filter.filter(userEntry));