You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/02/26 01:58:53 UTC
[hbase] branch branch-2 updated: HBASE-25590 Bulkload replication
HFileRefs cannot be cleared in some cases where set
exclude-namespace/exclude-table-cfs (#2969)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 328ff8c HBASE-25590 Bulkload replication HFileRefs cannot be cleared in some cases where set exclude-namespace/exclude-table-cfs (#2969)
328ff8c is described below
commit 328ff8c05a4ff8a0db3995c9a556acf0d3a10caa
Author: XinSun <dd...@gmail.com>
AuthorDate: Fri Feb 26 09:50:23 2021 +0800
HBASE-25590 Bulkload replication HFileRefs cannot be cleared in some cases where set exclude-namespace/exclude-table-cfs (#2969)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../hbase/replication/ReplicationPeerConfig.java | 29 +-
.../replication/TestReplicationPeerConfig.java | 366 ++++++++++++---------
.../NamespaceTableCfWALEntryFilter.java | 84 +----
.../regionserver/ReplicationSource.java | 28 +-
.../regionserver/TestBulkLoadReplication.java | 8 +-
.../TestBulkLoadReplicationHFileRefs.java | 310 +++++++++++++++++
6 files changed, 557 insertions(+), 268 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index 030ae3d..534357a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
/**
* A configuration for the replication peer cluster.
*/
@@ -372,6 +374,19 @@ public class ReplicationPeerConfig {
* @return true if the table need replicate to the peer cluster
*/
public boolean needToReplicate(TableName table) {
+ return needToReplicate(table, null);
+ }
+
+ /**
+ * Decide whether the passed family of the table need replicate to the peer cluster according to
+ * this peer config.
+ * @param table name of the table
+ * @param family family name
+ * @return true if (the family of) the table need replicate to the peer cluster.
+ * If passed family is null, return true if any CFs of the table need replicate;
+ * If passed family is not null, return true if the passed family need replicate.
+ */
+ public boolean needToReplicate(TableName table, byte[] family) {
String namespace = table.getNamespaceAsString();
if (replicateAllUserTables) {
// replicate all user tables, but filter by exclude namespaces and table-cfs config
@@ -383,9 +398,12 @@ public class ReplicationPeerConfig {
return true;
}
Collection<String> cfs = excludeTableCFsMap.get(table);
- // if cfs is null or empty then we can make sure that we do not need to replicate this table,
+ // If cfs is null or empty then we can make sure that we do not need to replicate this table,
// otherwise, we may still need to replicate the table but filter out some families.
- return cfs != null && !cfs.isEmpty();
+ return cfs != null && !cfs.isEmpty()
+ // If exclude-table-cfs contains passed family then we make sure that we do not need to
+ // replicate this family.
+ && (family == null || !cfs.contains(Bytes.toString(family)));
} else {
// Not replicate all user tables, so filter by namespaces and table-cfs config
if (namespaces == null && tableCFsMap == null) {
@@ -396,7 +414,12 @@ public class ReplicationPeerConfig {
if (namespaces != null && namespaces.contains(namespace)) {
return true;
}
- return tableCFsMap != null && tableCFsMap.containsKey(table);
+ // If table-cfs contains this table then we can make sure that we need replicate some CFs of
+ // this table. Further we need all CFs if tableCFsMap.get(table) is null or empty.
+ return tableCFsMap != null && tableCFsMap.containsKey(table)
+ && (family == null || CollectionUtils.isEmpty(tableCFsMap.get(table))
+ // If table-cfs must contain passed family then we need to replicate this family.
+ || tableCFsMap.get(table).contains(Bytes.toString(family)));
}
}
}
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
index d67a3f8..ae2d426 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationPeerConfig.java
@@ -17,21 +17,26 @@
*/
package org.apache.hadoop.hbase.replication;
-import java.util.HashMap;
-import java.util.HashSet;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
import java.util.List;
import java.util.Map;
-import java.util.Set;
+
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.BuilderStyleTest;
-import org.junit.Assert;
+import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
@Category({ClientTests.class, SmallTests.class})
public class TestReplicationPeerConfig {
@@ -39,8 +44,12 @@ public class TestReplicationPeerConfig {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestReplicationPeerConfig.class);
- private static TableName TABLE_A = TableName.valueOf("replication", "testA");
- private static TableName TABLE_B = TableName.valueOf("replication", "testB");
+ private static final String NAMESPACE_REPLICATE = "replicate";
+ private static final String NAMESPACE_OTHER = "other";
+ private static final TableName TABLE_A = TableName.valueOf(NAMESPACE_REPLICATE, "testA");
+ private static final TableName TABLE_B = TableName.valueOf(NAMESPACE_REPLICATE, "testB");
+ private static final byte[] FAMILY1 = Bytes.toBytes("cf1");
+ private static final byte[] FAMILY2 = Bytes.toBytes("cf2");
@Test
public void testClassMethodsAreBuilderStyle() {
@@ -61,193 +70,230 @@ public class TestReplicationPeerConfig {
@Test
public void testNeedToReplicateWithReplicatingAll() {
- ReplicationPeerConfig peerConfig;
- ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
- new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
- Map<TableName, List<String>> tableCfs = new HashMap<>();
- Set<String> namespaces = new HashSet<>();
-
// 1. replication_all flag is true, no namespaces and table-cfs config
- builder.setReplicateAllUserTables(true);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+ ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
// 2. replicate_all flag is true, and config in excludedTableCfs
- builder.setExcludeNamespaces(null);
- // empty map
- tableCfs = new HashMap<>();
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
-
- // table testB
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+ // Exclude empty table-cfs map
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(Maps.newHashMap())
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
- // table testA
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+ // Exclude table B
+ Map<TableName, List<String>> tableCfs = Maps.newHashMap();
+ tableCfs.put(TABLE_B, null);
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(tableCfs)
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+ assertFalse(peerConfig.needToReplicate(TABLE_B));
// 3. replicate_all flag is true, and config in excludeNamespaces
- builder.setExcludeTableCFsMap(null);
- // empty set
- namespaces = new HashSet<>();
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
-
- // namespace default
- namespaces = new HashSet<>();
- namespaces.add("default");
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
-
- // namespace replication
- namespaces = new HashSet<>();
- namespaces.add("replication");
- builder.setReplicateAllUserTables(true);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+ // Exclude empty namespace set
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeNamespaces(Sets.newHashSet())
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // Exclude namespace other
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ // Exclude namespace replication
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
// 4. replicate_all flag is true, and config excludeNamespaces and excludedTableCfs both
// Namespaces config doesn't conflict with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
+ tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+ .setExcludeTableCFsMap(tableCfs)
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
// Namespaces config conflicts with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("default");
+ tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
-
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(true);
- builder.setExcludeTableCFsMap(tableCfs);
- builder.setExcludeNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(tableCfs)
+ .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
+ assertTrue(peerConfig.needToReplicate(TABLE_B));
+ tableCfs = Maps.newHashMap();
+ tableCfs.put(TABLE_B, null);
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(tableCfs)
+ .setExcludeNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
+ assertFalse(peerConfig.needToReplicate(TABLE_B));
}
@Test
public void testNeedToReplicateWithoutReplicatingAll() {
ReplicationPeerConfig peerConfig;
- ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl builder =
- new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl();
- Map<TableName, List<String>> tableCfs = new HashMap<>();
- Set<String> namespaces = new HashSet<>();
+ Map<TableName, List<String>> tableCfs;
// 1. replication_all flag is false, no namespaces and table-cfs config
- builder.setReplicateAllUserTables(false);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
// 2. replicate_all flag is false, and only config table-cfs in peer
- // empty map
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
-
- // table testB
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
+ // Set empty table-cfs map
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(Maps.newHashMap())
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
- // table testA
- tableCfs = new HashMap<>();
- tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+ // Set table B
+ tableCfs = Maps.newHashMap();
+ tableCfs.put(TABLE_B, null);
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(tableCfs)
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
+ assertTrue(peerConfig.needToReplicate(TABLE_B));
// 3. replication_all flag is false, and only config namespace in peer
- builder.setTableCFsMap(null);
- // empty set
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
-
- // namespace default
- namespaces = new HashSet<>();
- namespaces.add("default");
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertFalse(peerConfig.needToReplicate(TABLE_A));
-
- // namespace replication
- namespaces = new HashSet<>();
- namespaces.add("replication");
- builder.setReplicateAllUserTables(false);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+ // Set empty namespace set
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setNamespaces(Sets.newHashSet())
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // Set namespace other
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
+
+ // Set namespace replication
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
// 4. replicate_all flag is false, and config namespaces and table-cfs both
// Namespaces config doesn't conflict with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
+ tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(tableCfs)
+ .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
// Namespaces config conflicts with table-cfs config
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("default");
+ tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_A, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
-
- namespaces = new HashSet<>();
- tableCfs = new HashMap<>();
- namespaces.add("replication");
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(tableCfs)
+ .setNamespaces(Sets.newHashSet(NAMESPACE_OTHER))
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+
+ tableCfs = Maps.newHashMap();
tableCfs.put(TABLE_B, null);
- builder.setReplicateAllUserTables(false);
- builder.setTableCFsMap(tableCfs);
- builder.setNamespaces(namespaces);
- peerConfig = builder.build();
- Assert.assertTrue(peerConfig.needToReplicate(TABLE_A));
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setNamespaces(Sets.newHashSet(NAMESPACE_REPLICATE))
+ .setTableCFsMap(tableCfs)
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+ }
+
+ @Test
+ public void testNeedToReplicateCFWithReplicatingAll() {
+ Map<TableName, List<String>> excludeTableCfs = Maps.newHashMap();
+ excludeTableCfs.put(TABLE_A, null);
+ ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(excludeTableCfs)
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
+ assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+ assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+
+ excludeTableCfs = Maps.newHashMap();
+ excludeTableCfs.put(TABLE_A, Lists.newArrayList());
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(excludeTableCfs)
+ .build();
+ assertFalse(peerConfig.needToReplicate(TABLE_A));
+ assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+ assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+
+ excludeTableCfs = Maps.newHashMap();
+ excludeTableCfs.put(TABLE_A, Lists.newArrayList(Bytes.toString(FAMILY1)));
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(excludeTableCfs)
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+ assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+ assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+ }
+
+ @Test
+ public void testNeedToReplicateCFWithoutReplicatingAll() {
+ Map<TableName, List<String>> tableCfs = Maps.newHashMap();
+ tableCfs.put(TABLE_A, null);
+ ReplicationPeerConfig peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(tableCfs)
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+ assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+ assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+
+ tableCfs = Maps.newHashMap();
+ tableCfs.put(TABLE_A, Lists.newArrayList());
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(tableCfs)
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+ assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+ assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY2));
+
+ tableCfs = Maps.newHashMap();
+ tableCfs.put(TABLE_A, Lists.newArrayList(Bytes.toString(FAMILY1)));
+ peerConfig = new ReplicationPeerConfig.ReplicationPeerConfigBuilderImpl()
+ .setReplicateAllUserTables(false)
+ .setTableCFsMap(tableCfs)
+ .build();
+ assertTrue(peerConfig.needToReplicate(TABLE_A));
+ assertTrue(peerConfig.needToReplicate(TABLE_A, FAMILY1));
+ assertFalse(peerConfig.needToReplicate(TABLE_A, FAMILY2));
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
index 58705f0..4fe04cd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/NamespaceTableCfWALEntryFilter.java
@@ -18,27 +18,17 @@
package org.apache.hadoop.hbase.replication;
-import java.util.List;
-import java.util.Map;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * Filter a WAL Entry by the peer config: replicate_all flag, namespaces config, table-cfs config,
- * exclude namespaces config, and exclude table-cfs config.
+ * Filter a WAL Entry by the peer config according to the table and family which it belongs to.
*
- * If replicate_all flag is true, it means all user tables will be replicated to peer cluster. But
- * you can set exclude namespaces or exclude table-cfs which can't be replicated to peer cluster.
- * Note: set a exclude namespace means that all tables in this namespace can't be replicated.
- *
- * If replicate_all flag is false, it means all user tables can't be replicated to peer cluster.
- * But you can set namespaces or table-cfs which will be replicated to peer cluster.
- * Note: set a namespace means that all tables in this namespace will be replicated.
+ * @see ReplicationPeerConfig#needToReplicate(TableName, byte[])
*/
@InterfaceAudience.Private
public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFilter {
@@ -62,72 +52,12 @@ public class NamespaceTableCfWALEntryFilter implements WALEntryFilter, WALCellFi
@Override
public Cell filterCell(final Entry entry, Cell cell) {
ReplicationPeerConfig peerConfig = this.peer.getPeerConfig();
- if (peerConfig.replicateAllUserTables()) {
- // replicate all user tables, but filter by exclude table-cfs config
- final Map<TableName, List<String>> excludeTableCfs = peerConfig.getExcludeTableCFsMap();
- if (excludeTableCfs == null) {
- return cell;
- }
-
- if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
- cell = bulkLoadFilter.filterCell(cell,
- fam -> filterByExcludeTableCfs(entry.getKey().getTableName(), Bytes.toString(fam),
- excludeTableCfs));
- } else {
- if (filterByExcludeTableCfs(entry.getKey().getTableName(),
- Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
- excludeTableCfs)) {
- return null;
- }
- }
-
- return cell;
+ TableName tableName = entry.getKey().getTableName();
+ if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+ // If the cell is about BULKLOAD event, unpack and filter it by BulkLoadCellFilter.
+ return bulkLoadFilter.filterCell(cell, fam -> !peerConfig.needToReplicate(tableName, fam));
} else {
- // not replicate all user tables, so filter by table-cfs config
- final Map<TableName, List<String>> tableCfs = peerConfig.getTableCFsMap();
- if (tableCfs == null) {
- return cell;
- }
-
- if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
- cell = bulkLoadFilter.filterCell(cell,
- fam -> filterByTableCfs(entry.getKey().getTableName(), Bytes.toString(fam), tableCfs));
- } else {
- if (filterByTableCfs(entry.getKey().getTableName(),
- Bytes.toString(cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()),
- tableCfs)) {
- return null;
- }
- }
-
- return cell;
- }
- }
-
- private boolean filterByExcludeTableCfs(TableName tableName, String family,
- Map<TableName, List<String>> excludeTableCfs) {
- List<String> excludeCfs = excludeTableCfs.get(tableName);
- if (excludeCfs != null) {
- // empty cfs means all cfs of this table are excluded
- if (excludeCfs.isEmpty()) {
- return true;
- }
- // ignore(remove) kv if its cf is in the exclude cfs list
- if (excludeCfs.contains(family)) {
- return true;
- }
- }
- return false;
- }
-
- private boolean filterByTableCfs(TableName tableName, String family,
- Map<TableName, List<String>> tableCfs) {
- List<String> cfs = tableCfs.get(tableName);
- // ignore(remove) kv if its cf isn't in the replicable cf list
- // (empty cfs means all cfs of this table are replicable)
- if (cfs != null && !cfs.contains(family)) {
- return true;
+ return peerConfig.needToReplicate(tableName, CellUtil.cloneFamily(cell)) ? cell : null;
}
- return false;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 42757b4..8c7f0a6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -26,7 +26,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -273,31 +272,12 @@ public class ReplicationSource implements ReplicationSourceInterface {
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerId = replicationPeer.getId();
- Set<String> namespaces = replicationPeer.getNamespaces();
- Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
- if (tableCFMap != null) { // All peers with TableCFs
- List<String> tableCfs = tableCFMap.get(tableName);
- if (tableCFMap.containsKey(tableName)
- && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
- this.queueStorage.addHFileRefs(peerId, pairs);
- metrics.incrSizeOfHFileRefsQueue(pairs.size());
- } else {
- LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
- tableName, Bytes.toString(family), peerId);
- }
- } else if (namespaces != null) { // Only for set NAMESPACES peers
- if (namespaces.contains(tableName.getNamespaceAsString())) {
- this.queueStorage.addHFileRefs(peerId, pairs);
- metrics.incrSizeOfHFileRefsQueue(pairs.size());
- } else {
- LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
- tableName, Bytes.toString(family), peerId);
- }
- } else {
- // user has explicitly not defined any table cfs for replication, means replicate all the
- // data
+ if (replicationPeer.getPeerConfig().needToReplicate(tableName, family)) {
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
+ } else {
+ LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+ tableName, Bytes.toString(family), peerId);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 0212655..9161d23 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -183,9 +183,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
//adds cluster2 as a remote peer on cluster3
UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
- setupCoprocessor(UTIL1, "cluster1");
- setupCoprocessor(UTIL2, "cluster2");
- setupCoprocessor(UTIL3, "cluster3");
+ setupCoprocessor(UTIL1);
+ setupCoprocessor(UTIL2);
+ setupCoprocessor(UTIL3);
BULK_LOADS_COUNT = new AtomicInteger(0);
}
@@ -194,7 +194,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
.setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
}
- private void setupCoprocessor(HBaseTestingUtility cluster, String name){
+ private void setupCoprocessor(HBaseTestingUtility cluster){
cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
try {
TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java
new file mode 100644
index 0000000..134ea47
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplicationHFileRefs.java
@@ -0,0 +1,310 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+
+@Category({ ReplicationTests.class, SmallTests.class})
+public class TestBulkLoadReplicationHFileRefs extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBulkLoadReplicationHFileRefs.class);
+
+ private static final String PEER1_CLUSTER_ID = "peer1";
+ private static final String PEER2_CLUSTER_ID = "peer2";
+
+ private static final String REPLICATE_NAMESPACE = "replicate_ns";
+ private static final String NO_REPLICATE_NAMESPACE = "no_replicate_ns";
+ private static final TableName REPLICATE_TABLE =
+ TableName.valueOf(REPLICATE_NAMESPACE, "replicate_table");
+ private static final TableName NO_REPLICATE_TABLE =
+ TableName.valueOf(NO_REPLICATE_NAMESPACE, "no_replicate_table");
+ private static final byte[] CF_A = Bytes.toBytes("cfa");
+ private static final byte[] CF_B = Bytes.toBytes("cfb");
+
+ private byte[] row = Bytes.toBytes("r1");
+ private byte[] qualifier = Bytes.toBytes("q1");
+ private byte[] value = Bytes.toBytes("v1");
+
+ @ClassRule
+ public static TemporaryFolder testFolder = new TemporaryFolder();
+
+ private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
+
+ private static Admin admin1;
+ private static Admin admin2;
+
+ private static ReplicationQueueStorage queueStorage;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
+ setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
+ TestReplicationBase.setUpBeforeClass();
+ admin1 = UTIL1.getConnection().getAdmin();
+ admin2 = UTIL2.getConnection().getAdmin();
+
+ queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(UTIL1.getZooKeeperWatcher(),
+ UTIL1.getConfiguration());
+
+ admin1.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
+ admin2.createNamespace(NamespaceDescriptor.create(REPLICATE_NAMESPACE).build());
+ admin1.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
+ admin2.createNamespace(NamespaceDescriptor.create(NO_REPLICATE_NAMESPACE).build());
+ }
+
+ protected static void setupBulkLoadConfigsForCluster(Configuration config,
+ String clusterReplicationId) throws Exception {
+ config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
+ File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
+ File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath() + "/hbase-site.xml");
+ config.writeXml(new FileOutputStream(sourceConfigFile));
+ config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
+ admin1.removeReplicationPeer(peer.getPeerId());
+ }
+ }
+
+ @After
+ public void teardown() throws Exception {
+ for (ReplicationPeerDescription peer : admin1.listReplicationPeers()) {
+ admin1.removeReplicationPeer(peer.getPeerId());
+ }
+ for (TableName tableName : admin1.listTableNames()) {
+ UTIL1.deleteTable(tableName);
+ }
+ for (TableName tableName : admin2.listTableNames()) {
+ UTIL2.deleteTable(tableName);
+ }
+ }
+
+ @Test
+ public void testWhenExcludeCF() throws Exception {
+ // Create table in source and remote clusters.
+ createTableOnClusters(REPLICATE_TABLE, CF_A, CF_B);
+ // Add peer, setReplicateAllUserTables true, but exclude CF_B.
+ Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
+ excludeTableCFs.put(REPLICATE_TABLE, Lists.newArrayList(Bytes.toString(CF_B)));
+ ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL2.getClusterKey())
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(excludeTableCFs)
+ .build();
+ admin1.addReplicationPeer(PEER_ID2, peerConfig);
+ Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
+ Assert.assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
+ Assert.assertFalse(peerConfig.needToReplicate(REPLICATE_TABLE, CF_B));
+
+ assertEquals(0, queueStorage.getAllHFileRefs().size());
+
+ // Bulk load data into the CF that is not replicated.
+ bulkLoadOnCluster(REPLICATE_TABLE, CF_B);
+ Threads.sleep(1000);
+
+ // Cannot get data from remote cluster
+ Table table2 = UTIL2.getConnection().getTable(REPLICATE_TABLE);
+ Result result = table2.get(new Get(row));
+ assertTrue(Bytes.equals(null, result.getValue(CF_B, qualifier)));
+ // The extra HFile is never added to the HFileRefs
+ assertEquals(0, queueStorage.getAllHFileRefs().size());
+ }
+
+ @Test
+ public void testWhenExcludeTable() throws Exception {
+ // Create 2 tables in source and remote clusters.
+ createTableOnClusters(REPLICATE_TABLE, CF_A);
+ createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
+
+ // Add peer, setReplicateAllUserTables true, but exclude one table.
+ Map<TableName, List<String>> excludeTableCFs = Maps.newHashMap();
+ excludeTableCFs.put(NO_REPLICATE_TABLE, null);
+ ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL2.getClusterKey())
+ .setReplicateAllUserTables(true)
+ .setExcludeTableCFsMap(excludeTableCFs)
+ .build();
+ admin1.addReplicationPeer(PEER_ID2, peerConfig);
+ assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
+ assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
+ assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
+ assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
+
+ assertEquals(0, queueStorage.getAllHFileRefs().size());
+
+ // Bulk load data into the table that is not replicated.
+ bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
+ Threads.sleep(1000);
+
+ // Cannot get data from remote cluster
+ Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
+ Result result = table2.get(new Get(row));
+ assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
+
+ // The extra HFile is never added to the HFileRefs
+ assertEquals(0, queueStorage.getAllHFileRefs().size());
+ }
+
+ @Test
+ public void testWhenExcludeNamespace() throws Exception {
+ // Create 2 tables in source and remote clusters.
+ createTableOnClusters(REPLICATE_TABLE, CF_A);
+ createTableOnClusters(NO_REPLICATE_TABLE, CF_A);
+
+ // Add peer, setReplicateAllUserTables true, but exclude one namespace.
+ ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+ .setClusterKey(UTIL2.getClusterKey())
+ .setReplicateAllUserTables(true)
+ .setExcludeNamespaces(Sets.newHashSet(NO_REPLICATE_NAMESPACE))
+ .build();
+ admin1.addReplicationPeer(PEER_ID2, peerConfig);
+ assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE));
+ assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE));
+ assertTrue(peerConfig.needToReplicate(REPLICATE_TABLE, CF_A));
+ assertFalse(peerConfig.needToReplicate(NO_REPLICATE_TABLE, CF_A));
+
+ assertEquals(0, queueStorage.getAllHFileRefs().size());
+
+ // Bulk load data into the table of the namespace that is not replicated.
+ byte[] row = Bytes.toBytes("001");
+ byte[] value = Bytes.toBytes("v1");
+ bulkLoadOnCluster(NO_REPLICATE_TABLE, CF_A);
+ Threads.sleep(1000);
+
+ // Cannot get data from remote cluster
+ Table table2 = UTIL2.getConnection().getTable(NO_REPLICATE_TABLE);
+ Result result = table2.get(new Get(row));
+ assertTrue(Bytes.equals(null, result.getValue(CF_A, qualifier)));
+
+ // The extra HFile is never added to the HFileRefs
+ assertEquals(0, queueStorage.getAllHFileRefs().size());
+ }
+
+ protected void bulkLoadOnCluster(TableName tableName, byte[] family)
+ throws Exception {
+ String bulkLoadFilePath = createHFileForFamilies(family);
+ copyToHdfs(family, bulkLoadFilePath, UTIL1.getDFSCluster());
+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(UTIL1.getConfiguration());
+ bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
+ }
+
+ private String createHFileForFamilies(byte[] family) throws IOException {
+ CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+ cellBuilder.setRow(row)
+ .setFamily(family)
+ .setQualifier(qualifier)
+ .setValue(value)
+ .setType(Cell.Type.Put);
+
+ HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(UTIL1.getConfiguration());
+ File hFileLocation = testFolder.newFile();
+ FSDataOutputStream out =
+ new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
+ try {
+ hFileFactory.withOutputStream(out);
+ hFileFactory.withFileContext(new HFileContextBuilder().build());
+ HFile.Writer writer = hFileFactory.create();
+ try {
+ writer.append(new KeyValue(cellBuilder.build()));
+ } finally {
+ writer.close();
+ }
+ } finally {
+ out.close();
+ }
+ return hFileLocation.getAbsoluteFile().getAbsolutePath();
+ }
+
+ private void copyToHdfs(byte[] family, String bulkLoadFilePath, MiniDFSCluster cluster)
+ throws Exception {
+ Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, Bytes.toString(family));
+ cluster.getFileSystem().mkdirs(bulkLoadDir);
+ cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
+ }
+
+ private void createTableOnClusters(TableName tableName, byte[]... cfs) throws IOException {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ for (byte[] cf : cfs) {
+ builder.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build());
+ }
+ TableDescriptor td = builder.build();
+ admin1.createTable(td);
+ admin2.createTable(td);
+ }
+}