You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/09/23 16:23:52 UTC
[hbase] branch branch-2.1 updated: HBASE-22380 break circle
replication when doing bulkload
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 11f649e HBASE-22380 break circle replication when doing bulkload
11f649e is described below
commit 11f649eefd31ba5d875a0a918313c90663a81129
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Mon Sep 23 17:16:15 2019 +0100
HBASE-22380 break circle replication when doing bulkload
Signed-off-by: stack <st...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Norbert Kalmar <nk...@cloudera.com>
(cherry picked from commit 38c8bd37319325f97b1a6fe8a64c0c71683782b9)
---
.../apache/hadoop/hbase/regionserver/HRegion.java | 9 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 10 +-
.../hbase/regionserver/SecureBulkLoadManager.java | 10 +-
.../replication/regionserver/HFileReplicator.java | 5 +-
.../replication/regionserver/ReplicationSink.java | 44 ++-
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 8 +-
.../regionserver/TestBulkLoadReplication.java | 330 +++++++++++++++++++++
7 files changed, 393 insertions(+), 23 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index f3b5a90..e95e13e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6046,7 +6046,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
- return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
+ return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
}
/**
@@ -6091,11 +6091,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
* file about to be bulk loaded
* @param copyFile always copy hfiles if true
+ * @param clusterIds ids from clusters that had already handled the given bulkload event.
* @return Map from family to List of store file paths if successful, null if failed recoverably
* @throws IOException if failed unrecoverably.
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
- boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
+ boolean assignSeqId, BulkLoadListener bulkLoadListener,
+ boolean copyFile, List<String> clusterIds) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6270,8 +6272,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
- storeFiles,
- storeFilesSizes, seqId);
+ storeFiles, storeFilesSizes, seqId, clusterIds);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 2b3bec7..aa54876 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2305,6 +2305,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
final BulkLoadHFileRequest request) throws ServiceException {
long start = EnvironmentEdgeManager.currentTime();
+ List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
+ if(clusterIds.contains(this.regionServer.clusterId)){
+ return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
+ } else {
+ clusterIds.add(this.regionServer.clusterId);
+ }
try {
checkOpen();
requestCount.increment();
@@ -2337,7 +2343,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
try {
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
- request.getCopyFile());
+ request.getCopyFile(), clusterIds);
} finally {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
@@ -2345,7 +2351,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
} else {
// secure bulk load
- map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
+ map = regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds);
}
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(map != null);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 6b55744..f51608d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -212,7 +212,12 @@ public class SecureBulkLoadManager {
}
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
- final BulkLoadHFileRequest request) throws IOException {
+ final BulkLoadHFileRequest request) throws IOException {
+ return secureBulkLoadHFiles(region, request, null);
+ }
+
+ public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
+ final BulkLoadHFileRequest request, List<String> clusterIds) throws IOException {
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
@@ -288,7 +293,8 @@ public class SecureBulkLoadManager {
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
- new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
+ new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
+ clusterIds);
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index ab9a236..c7fed77 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -87,17 +87,19 @@ public class HFileReplicator {
private ThreadPoolExecutor exec;
private int maxCopyThreads;
private int copiesPerThread;
+ private List<String> sourceClusterIds;
public HFileReplicator(Configuration sourceClusterConf,
String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
- Connection connection) throws IOException {
+ Connection connection, List<String> sourceClusterIds) throws IOException {
this.sourceClusterConf = sourceClusterConf;
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
this.bulkLoadHFileMap = tableQueueMap;
this.conf = conf;
this.connection = connection;
+ this.sourceClusterIds = sourceClusterIds;
userProvider = UserProvider.instantiate(conf);
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
@@ -128,6 +130,7 @@ public class HFileReplicator {
LoadIncrementalHFiles loadHFiles = null;
try {
loadHFiles = new LoadIncrementalHFiles(conf);
+ loadHFiles.setClusterIds(sourceClusterIds);
} catch (Exception e) {
LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
+ " data.", e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index fb4e0f9..8079adc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -174,9 +174,7 @@ public class ReplicationSink {
// invocation of this method per table and cluster id.
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
- // Map of table name Vs list of pair of family and list of hfile paths from its namespace
- Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
-
+ Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
for (WALEntry entry : entries) {
TableName table =
TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -204,10 +202,19 @@ public class ReplicationSink {
Cell cell = cells.current();
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ if(bulkLoadsPerClusters == null) {
+ bulkLoadsPerClusters = new HashMap<>();
+ }
+ // Map of table name Vs list of pair of family and list of
+ // hfile paths from its namespace
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+ bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
+ bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
- buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
+ buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
@@ -243,14 +250,26 @@ public class ReplicationSink {
LOG.debug("Finished replicating mutations.");
}
- if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
- LOG.debug("Started replicating bulk loaded data.");
- HFileReplicator hFileReplicator =
- new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
+ if(bulkLoadsPerClusters != null) {
+ for (Entry<List<String>, Map<String, List<Pair<byte[],
+ List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
+ if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Started replicating bulk loaded data from cluster ids: {}.",
+ entry.getKey().toString());
+ }
+ HFileReplicator hFileReplicator =
+ new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
- getConnection());
- hFileReplicator.replicate();
- LOG.debug("Finished replicating bulk loaded data.");
+ getConnection(), entry.getKey());
+ hFileReplicator.replicate();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Finished replicating bulk loaded data from cluster id: {}",
+ entry.getKey().toString());
+ }
+ }
+ }
}
int size = entries.size();
@@ -265,8 +284,7 @@ public class ReplicationSink {
private void buildBulkLoadHFileMap(
final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
- Cell cell) throws IOException {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ BulkLoadDescriptor bld) throws IOException {
List<StoreDescriptor> storesList = bld.getStoresList();
int storesSize = storesList.size();
for (int j = 0; j < storesSize; j++) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index e4d5dcb..950f2a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -137,6 +137,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private String bulkToken;
+ private List<String> clusterIds = new ArrayList<>();
+
/**
* Represents an HFile waiting to be loaded. An queue is used in this class in order to support
* the case where a region has split during the process of the load. When this happens, the HFile
@@ -539,7 +541,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(getConf(), table);
success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
+ assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds);
}
return success ? regionName : null;
} finally {
@@ -1251,6 +1253,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
this.bulkToken = stagingDir;
}
+ public void setClusterIds(List<String> clusterIds) {
+ this.clusterIds = clusterIds;
+ }
+
/**
* Infers region boundaries for a new table.
* <p>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
new file mode 100644
index 0000000..4d69bdf
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -0,0 +1,330 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for bulk load replication. Defines three clusters, with the following
+ * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
+ * 2 and 3).
+ *
+ * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
+ * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
+ * topology all these bulk loads should get replicated only once on each peer. To assert this,
+ * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
+ * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
+ * we are not entering the infinite loop condition addressed by HBASE-22380.
+ */
+@Category({ ReplicationTests.class, MediumTests.class})
+public class TestBulkLoadReplication extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
+
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(TestBulkLoadReplication.class);
+
+ private static final String PEER1_CLUSTER_ID = "peer1";
+ private static final String PEER4_CLUSTER_ID = "peer4";
+ private static final String PEER3_CLUSTER_ID = "peer3";
+
+ private static final String PEER_ID1 = "1";
+ private static final String PEER_ID3 = "3";
+ private static final String PEER_ID4 = "4";
+
+ private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+ private static CountDownLatch BULK_LOAD_LATCH;
+
+ private static HBaseTestingUtility utility3;
+ private static HBaseTestingUtility utility4;
+ private static Configuration conf3;
+ private static Configuration conf4;
+ private static Table htable3;
+ private static Table htable4;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @ClassRule
+ public static TemporaryFolder testFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
+ conf3 = HBaseConfiguration.create(conf1);
+ conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
+ utility3 = new HBaseTestingUtility(conf3);
+ conf4 = HBaseConfiguration.create(conf1);
+ conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4");
+ utility3 = new HBaseTestingUtility(conf3);
+ utility4 = new HBaseTestingUtility(conf4);
+ TestReplicationBase.setUpBeforeClass();
+ setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
+ //utility4 is started within TestReplicationBase.setUpBeforeClass(), but we had not set
+ //bulkload replication configs yet, so setting a 4th utility.
+ setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
+ startCluster(utility3, conf3);
+ startCluster(utility4, conf4);
+ }
+
+ private static void startCluster(HBaseTestingUtility util, Configuration configuration)
+ throws Exception {
+ LOG.info("Setup Zk to same one from utility1 and utility4");
+ util.setZkCluster(utility1.getZkCluster());
+ util.startMiniCluster(2);
+
+ TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+ Connection connection = ConnectionFactory.createConnection(configuration);
+ try (Admin admin = connection.getAdmin()) {
+ admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+ }
+ util.waitUntilAllRegionsAssigned(tableName);
+ }
+
+ @Before
+ @Override
+ public void setUpBase() throws Exception {
+ super.setUpBase();
+ ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1);
+ ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4);
+ ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3);
+ //adds cluster4 as a remote peer on cluster1
+ utility1.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
+ //adds cluster1 as a remote peer on cluster4
+ utility4.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
+ //adds cluster3 as a remote peer on cluster4
+ utility4.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
+ //adds cluster4 as a remote peer on cluster3
+ utility3.getAdmin().addReplicationPeer(PEER_ID4, peer4Config);
+ setupCoprocessor(utility1);
+ setupCoprocessor(utility4);
+ setupCoprocessor(utility3);
+ }
+
+ private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
+ return ReplicationPeerConfig.newBuilder()
+ .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
+ }
+
+ private void setupCoprocessor(HBaseTestingUtility cluster){
+ cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
+ try {
+ r.getCoprocessorHost()
+ .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
+ cluster.getConfiguration());
+ } catch (Exception e){
+ LOG.error(e.getMessage(), e);
+ }
+ });
+ }
+
+ @After
+ @Override
+ public void tearDownBase() throws Exception {
+ super.tearDownBase();
+ utility4.getAdmin().removeReplicationPeer(PEER_ID1);
+ utility4.getAdmin().removeReplicationPeer(PEER_ID3);
+ utility3.getAdmin().removeReplicationPeer(PEER_ID4);
+ }
+
+ private static void setupBulkLoadConfigsForCluster(Configuration config,
+ String clusterReplicationId) throws Exception {
+ config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
+ File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
+ File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
+ + "/hbase-site.xml");
+ config.writeXml(new FileOutputStream(sourceConfigFile));
+ config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
+ }
+
+ @Test
+ public void testBulkLoadReplicationActiveActive() throws Exception {
+ Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName);
+ Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName);
+ Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName);
+ byte[] row = Bytes.toBytes("001");
+ byte[] value = Bytes.toBytes("v1");
+ assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable);
+ row = Bytes.toBytes("002");
+ value = Bytes.toBytes("v2");
+ assertBulkLoadConditions(row, value, utility4, peer1TestTable, peer4TestTable, peer3TestTable);
+ row = Bytes.toBytes("003");
+ value = Bytes.toBytes("v3");
+ assertBulkLoadConditions(row, value, utility3, peer1TestTable, peer4TestTable, peer3TestTable);
+ //Additional wait to make sure no extra bulk load happens
+ Thread.sleep(400);
+ //We have 3 bulk load events (1 initiated on each cluster).
+ //Each event gets 3 counts (the originator cluster, plus the two peers),
+ //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
+ assertEquals(9, BULK_LOADS_COUNT.get());
+ }
+
+ private void assertBulkLoadConditions(byte[] row, byte[] value,
+ HBaseTestingUtility utility, Table...tables) throws Exception {
+ BULK_LOAD_LATCH = new CountDownLatch(3);
+ bulkLoadOnCluster(row, value, utility);
+ assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
+ assertTableHasValue(tables[0], row, value);
+ assertTableHasValue(tables[1], row, value);
+ assertTableHasValue(tables[2], row, value);
+ }
+
+ private void bulkLoadOnCluster(byte[] row, byte[] value,
+ HBaseTestingUtility cluster) throws Exception {
+ String bulkLoadFile = createHFileForFamilies(row, value, cluster.getConfiguration());
+ Path bulkLoadFilePath = new Path(bulkLoadFile);
+ copyToHdfs(bulkLoadFile, cluster.getDFSCluster());
+ LoadIncrementalHFiles bulkLoadHFilesTool =
+ new LoadIncrementalHFiles(cluster.getConfiguration());
+ Map<byte[], List<Path>> family2Files = new HashMap<>();
+ List<Path> files = new ArrayList<>();
+ files.add(new Path("/bulk_dir/f/" + bulkLoadFilePath.getName()));
+ family2Files.put(Bytes.toBytes("f"), files);
+ bulkLoadHFilesTool.run(family2Files, tableName);
+ }
+
+ private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
+ Path bulkLoadDir = new Path("/bulk_dir/f");
+ cluster.getFileSystem().mkdirs(bulkLoadDir);
+ cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
+ }
+
+ private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
+ Get get = new Get(row);
+ Result result = table.get(get);
+ assertTrue(result.advance());
+ assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
+ }
+
+ private String createHFileForFamilies(byte[] row, byte[] value,
+ Configuration clusterConfig) throws IOException {
+ CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+ cellBuilder.setRow(row)
+ .setFamily(TestReplicationBase.famName)
+ .setQualifier(Bytes.toBytes("1"))
+ .setValue(value)
+ .setType(Cell.Type.Put);
+
+ HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
+ // TODO We need a way to do this without creating files
+ File hFileLocation = testFolder.newFile();
+ FSDataOutputStream out =
+ new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
+ try {
+ hFileFactory.withOutputStream(out);
+ hFileFactory.withFileContext(new HFileContext());
+ HFile.Writer writer = hFileFactory.create();
+ try {
+ writer.append(new KeyValue(cellBuilder.build()));
+ } finally {
+ writer.close();
+ }
+ } finally {
+ out.close();
+ }
+ return hFileLocation.getAbsoluteFile().getAbsolutePath();
+ }
+
+ public static class BulkReplicationTestObserver implements RegionCoprocessor {
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(new RegionObserver() {
+ @Override
+ public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> familyPaths) throws IOException {
+ BULK_LOADS_COUNT.incrementAndGet();
+ }
+
+ @Override
+ public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
+ throws IOException {
+ BULK_LOAD_LATCH.countDown();
+ }
+ });
+ }
+ }
+}