You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/09/21 13:02:23 UTC
[hbase] branch master updated: HBASE-22380 break circle replication
when doing bulkload (#566)
This is an automated email from the ASF dual-hosted git repository.
wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 3ae353c HBASE-22380 break circle replication when doing bulkload (#566)
3ae353c is described below
commit 3ae353cbf45c11b057386cc3cb74d6e9c8430a08
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Sat Sep 21 14:02:18 2019 +0100
HBASE-22380 break circle replication when doing bulkload (#566)
Signed-off-by: Josh Elser <el...@apache.org>
---
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 12 +-
.../hbase/shaded/protobuf/RequestConverter.java | 11 +-
.../src/main/protobuf/Client.proto | 1 +
hbase-protocol-shaded/src/main/protobuf/WAL.proto | 1 +
.../hbase/client/AsyncClusterConnection.java | 30 +-
.../hbase/client/AsyncClusterConnectionImpl.java | 14 +-
.../apache/hadoop/hbase/regionserver/HRegion.java | 9 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 8 +-
.../hbase/regionserver/SecureBulkLoadManager.java | 10 +-
.../replication/regionserver/HFileReplicator.java | 6 +-
.../replication/regionserver/ReplicationSink.java | 44 ++-
.../hadoop/hbase/tool/BulkLoadHFilesTool.java | 8 +-
.../regionserver/TestBulkLoadReplication.java | 316 +++++++++++++++++++++
.../hbase/replication/TestReplicationBase.java | 2 +-
14 files changed, 439 insertions(+), 33 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 353801f..9ccfad8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2566,12 +2566,22 @@ public final class ProtobufUtil {
* @return The WAL log marker for bulk loads.
*/
public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
+ ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
+ Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+ return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
+ storeFilesSize, bulkloadSeqId, null);
+ }
+
+ public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
- Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+ Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
BulkLoadDescriptor.Builder desc =
BulkLoadDescriptor.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+ if(clusterIds != null) {
+ desc.addAllClusterIds(clusterIds);
+ }
for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 352197a..ae3cd3f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -568,7 +568,7 @@ public final class RequestConverter {
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
- false);
+ false, null);
}
/**
@@ -583,9 +583,9 @@ public final class RequestConverter {
* @return a bulk load request
*/
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
- final List<Pair<byte[], String>> familyPaths,
- final byte[] regionName, boolean assignSeqNum,
- final Token<?> userToken, final String bulkToken, boolean copyFiles) {
+ final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
+ final Token<?> userToken, final String bulkToken, boolean copyFiles,
+ List<String> clusterIds) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@@ -623,6 +623,9 @@ public final class RequestConverter {
request.setBulkToken(bulkToken);
}
request.setCopyFile(copyFiles);
+ if (clusterIds != null) {
+ request.addAllClusterIds(clusterIds);
+ }
return request.build();
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 14abb08..07d8d71 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -378,6 +378,7 @@ message BulkLoadHFileRequest {
optional DelegationToken fs_token = 4;
optional string bulk_token = 5;
optional bool copy_file = 6 [default = false];
+ repeated string cluster_ids = 7;
message FamilyPath {
required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index 35a179c..c103075 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -150,6 +150,7 @@ message BulkLoadDescriptor {
required bytes encoded_region_name = 2;
repeated StoreDescriptor stores = 3;
required int64 bulkload_seq_num = 4;
+ repeated string cluster_ids = 5;
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 45dc8be..4b4d68f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -75,13 +75,39 @@ public interface AsyncClusterConnection extends AsyncConnection {
CompletableFuture<String> prepareBulkLoad(TableName tableName);
/**
- * Securely bulk load a list of HFiles.
- * @param row used to locate the region
+ * @deprecated Use bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
+ * byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken,
+ * boolean copyFiles, List<String> clusterIds)
*/
+ @Deprecated
CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles);
/**
+ * Securely bulk load a list of HFiles, passing additional list of clusters ids tracking
+ * clusters where the given bulk load has already been processed
+ * (important for bulk loading replication).
+ *
+ * Defined as default here to avoid breaking callers who rely on the bulkLoad version that
+ * does not expect additional clusterIds param.
+ *
+ * @param tableName the target table
+ * @param familyPaths hdfs path for the the table family dirs containg files to be loaded
+ * @param row row key
+ * @param assignSeqNum seq num for the event on WAL
+ * @param userToken user token
+ * @param bulkToken bulk load token
+ * @param copyFiles flag for copying the loaded hfiles
+ * @param clusterIds list of cluster ids where the given bulk load has already been processed.
+ * @return
+ */
+ default CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[],
+ String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken,
+ boolean copyFiles, List<String> clusterIds) {
+ return null;
+ }
+
+ /**
* Clean up after finishing bulk load, no matter success or not.
*/
CompletableFuture<Void> cleanupBulkLoad(TableName tableName, String bulkToken);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 328b959..f167770 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -108,14 +108,22 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
@Override
public CompletableFuture<Boolean> bulkLoad(TableName tableName,
- List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
- String bulkToken, boolean copyFiles) {
+ List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
+ String bulkToken, boolean copyFiles) {
+ return bulkLoad(tableName, familyPaths, row, assignSeqNum,
+ userToken, bulkToken, copyFiles, null);
+ }
+
+ @Override
+ public CompletableFuture<Boolean> bulkLoad(TableName tableName,
+ List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
+ String bulkToken, boolean copyFiles, List<String> clusterIds) {
return callerFactory.<Boolean> single().table(tableName).row(row)
.action((controller, loc, stub) -> ConnectionUtils
.<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
null,
(rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
- userToken, bulkToken, copyFiles),
+ userToken, bulkToken, copyFiles, clusterIds),
(s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
.call();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5594327..b27794e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6142,7 +6142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
- return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
+ return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
}
/**
@@ -6187,11 +6187,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @param bulkLoadListener Internal hooks enabling massaging/preparation of a
* file about to be bulk loaded
* @param copyFile always copy hfiles if true
+ * @param clusterIds ids from clusters that had already handled the given bulkload event.
* @return Map from family to List of store file paths if successful, null if failed recoverably
* @throws IOException if failed unrecoverably.
*/
public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
- boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
+ boolean assignSeqId, BulkLoadListener bulkLoadListener,
+ boolean copyFile, List<String> clusterIds) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6366,8 +6368,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
- storeFiles,
- storeFilesSizes, seqId);
+ storeFiles, storeFilesSizes, seqId, clusterIds);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
loadDescriptor, mvcc);
} catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index fe1c43a..661311e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2386,6 +2386,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
final BulkLoadHFileRequest request) throws ServiceException {
long start = EnvironmentEdgeManager.currentTime();
+ List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
+ if(clusterIds.contains(this.regionServer.clusterId)){
+ return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
+ } else {
+ clusterIds.add(this.regionServer.clusterId);
+ }
try {
checkOpen();
requestCount.increment();
@@ -2410,7 +2416,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
// secure bulk load
Map<byte[], List<Path>> map =
- regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
+ regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds);
BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
builder.setLoaded(map != null);
if (map != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 2dfb43b..ad19473 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -214,7 +214,12 @@ public class SecureBulkLoadManager {
}
public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
- final BulkLoadHFileRequest request) throws IOException {
+ final BulkLoadHFileRequest request) throws IOException {
+ return secureBulkLoadHFiles(region, request, null);
+ }
+
+ public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
+ final BulkLoadHFileRequest request, List<String> clusterIds) throws IOException {
final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
@@ -290,7 +295,8 @@ public class SecureBulkLoadManager {
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
- new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
+ new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
+ clusterIds);
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index ed0ea15..169f747 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -91,17 +91,19 @@ public class HFileReplicator {
private ThreadPoolExecutor exec;
private int maxCopyThreads;
private int copiesPerThread;
+ private List<String> sourceClusterIds;
public HFileReplicator(Configuration sourceClusterConf,
String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
- AsyncClusterConnection connection) throws IOException {
+ AsyncClusterConnection connection, List<String> sourceClusterIds) throws IOException {
this.sourceClusterConf = sourceClusterConf;
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
this.bulkLoadHFileMap = tableQueueMap;
this.conf = conf;
this.connection = connection;
+ this.sourceClusterIds = sourceClusterIds;
userProvider = UserProvider.instantiate(conf);
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
@@ -156,6 +158,8 @@ public class HFileReplicator {
BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
// Set the staging directory which will be used by BulkLoadHFilesTool for loading the data
loader.setBulkToken(stagingDir.toString());
+ //updating list of cluster ids where this bulkload event has already been processed
+ loader.setClusterIds(sourceClusterIds);
for (int count = 0; !queue.isEmpty(); count++) {
if (count != 0) {
LOG.warn("Error occurred while replicating HFiles, retry attempt " + count + " with " +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index e30e637..6c3f70c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -175,9 +175,7 @@ public class ReplicationSink {
// invocation of this method per table and cluster id.
Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
- // Map of table name Vs list of pair of family and list of hfile paths from its namespace
- Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
-
+ Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
for (WALEntry entry : entries) {
TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
if (this.walEntrySinkFilter != null) {
@@ -204,10 +202,19 @@ public class ReplicationSink {
Cell cell = cells.current();
// Handle bulk load hfiles replication
if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+ BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ if(bulkLoadsPerClusters == null) {
+ bulkLoadsPerClusters = new HashMap<>();
+ }
+ // Map of table name Vs list of pair of family and list of
+ // hfile paths from its namespace
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+ bulkLoadsPerClusters.get(bld.getClusterIdsList());
if (bulkLoadHFileMap == null) {
bulkLoadHFileMap = new HashMap<>();
+ bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
}
- buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
+ buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
} else {
// Handle wal replication
if (isNewRowOrType(previousCell, cell)) {
@@ -245,14 +252,26 @@ public class ReplicationSink {
LOG.debug("Finished replicating mutations.");
}
- if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
- LOG.debug("Started replicating bulk loaded data.");
- HFileReplicator hFileReplicator =
- new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
+ if(bulkLoadsPerClusters != null) {
+ for (Entry<List<String>, Map<String, List<Pair<byte[], List<String>>>>> entry :
+ bulkLoadsPerClusters.entrySet()) {
+ Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
+ if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Started replicating bulk loaded data from cluster ids: {}.",
+ entry.getKey().toString());
+ }
+ HFileReplicator hFileReplicator =
+ new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
- getConnection());
- hFileReplicator.replicate();
- LOG.debug("Finished replicating bulk loaded data.");
+ getConnection(), entry.getKey());
+ hFileReplicator.replicate();
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Finished replicating bulk loaded data from cluster id: {}",
+ entry.getKey().toString());
+ }
+ }
+ }
}
int size = entries.size();
@@ -267,8 +286,7 @@ public class ReplicationSink {
private void buildBulkLoadHFileMap(
final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
- Cell cell) throws IOException {
- BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+ BulkLoadDescriptor bld) throws IOException {
List<StoreDescriptor> storesList = bld.getStoresList();
int storesSize = storesList.size();
for (int j = 0; j < storesSize; j++) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 6efb4b5..0e2e029 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -131,6 +131,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
private final AtomicInteger numRetries = new AtomicInteger(0);
private String bulkToken;
+ private List<String> clusterIds = new ArrayList<>();
+
public BulkLoadHFilesTool(Configuration conf) {
// make a copy, just to be sure we're not overriding someone else's config
super(new Configuration(conf));
@@ -377,7 +379,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
.collect(Collectors.toList());
CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
- fsDelegationToken.getUserToken(), bulkToken, copyFiles), (loaded, error) -> {
+ fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> {
if (error != null) {
LOG.error("Encountered unrecoverable error from region server", error);
if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
@@ -997,6 +999,10 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
this.bulkToken = bulkToken;
}
+ public void setClusterIds(List<String> clusterIds) {
+ this.clusterIds = clusterIds;
+ }
+
private void usage() {
System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] "
+ "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n"
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
new file mode 100644
index 0000000..7a49989
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for bulk load replication. Defines three clusters, with the following
+ * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
+ * 2 and 3).
+ *
+ * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
+ * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
+ * topology all these bulk loads should get replicated only once on each peer. To assert this,
+ * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
+ * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
+ * we are not entering the infinite loop condition addressed by HBASE-22380.
+ */
+@Category({ ReplicationTests.class, MediumTests.class})
+public class TestBulkLoadReplication extends TestReplicationBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
+
+ protected static final Logger LOG =
+ LoggerFactory.getLogger(TestBulkLoadReplication.class);
+
+ private static final String PEER1_CLUSTER_ID = "peer1";
+ private static final String PEER2_CLUSTER_ID = "peer2";
+ private static final String PEER3_CLUSTER_ID = "peer3";
+
+ private static final String PEER_ID1 = "1";
+ private static final String PEER_ID3 = "3";
+
+ private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+ private static CountDownLatch BULK_LOAD_LATCH;
+
+ private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
+ private static final Configuration CONF3 = UTIL3.getConfiguration();
+
+ private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
+
+ private static Table htable3;
+
+ @Rule
+ public TestName name = new TestName();
+
+ @ClassRule
+ public static TemporaryFolder testFolder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
+ setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
+ setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
+ setupConfig(UTIL3, "/3");
+ TestReplicationBase.setUpBeforeClass();
+ startThirdCluster();
+ }
+
+ private static void startThirdCluster() throws Exception {
+ LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
+ UTIL3.setZkCluster(UTIL1.getZkCluster());
+ UTIL3.startMiniCluster(NUM_SLAVES1);
+
+ TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+ Connection connection3 = ConnectionFactory.createConnection(CONF3);
+ try (Admin admin3 = connection3.getAdmin()) {
+ admin3.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+ }
+ UTIL3.waitUntilAllRegionsAssigned(tableName);
+ htable3 = connection3.getTable(tableName);
+ }
+
+ @Before
+ @Override
+ public void setUpBase() throws Exception {
+ //"super.setUpBase()" already sets replication from 1->2,
+ //then on the subsequent lines, sets 2->1, 2->3 and 3->2.
+ //So we have following topology: "1 <-> 2 <->3"
+ super.setUpBase();
+ ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
+ ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
+ ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
+ //adds cluster1 as a remote peer on cluster2
+ UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
+ //adds cluster3 as a remote peer on cluster2
+ UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
+ //adds cluster2 as a remote peer on cluster3
+ UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
+ setupCoprocessor(UTIL1, "cluster1");
+ setupCoprocessor(UTIL2, "cluster2");
+ setupCoprocessor(UTIL3, "cluster3");
+ }
+
+ private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
+ return ReplicationPeerConfig.newBuilder()
+ .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
+ }
+
+ private void setupCoprocessor(HBaseTestingUtility cluster, String name){
+ cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
+ try {
+ r.getCoprocessorHost()
+ .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
+ cluster.getConfiguration());
+ TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost()
+ .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+ cp.clusterName = cluster.getClusterKey();
+ } catch (Exception e){
+ LOG.error(e.getMessage(), e);
+ }
+ });
+ }
+
+ @After
+ @Override
+ public void tearDownBase() throws Exception {
+ super.tearDownBase();
+ UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
+ UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
+ UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
+ }
+
+ private static void setupBulkLoadConfigsForCluster(Configuration config,
+ String clusterReplicationId) throws Exception {
+ config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
+ File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
+ File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
+ + "/hbase-site.xml");
+ config.writeXml(new FileOutputStream(sourceConfigFile));
+ config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
+ }
+
+ @Test
+ public void testBulkLoadReplicationActiveActive() throws Exception {
+ Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
+ Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
+ Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
+ byte[] row = Bytes.toBytes("001");
+ byte[] value = Bytes.toBytes("v1");
+ assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable);
+ row = Bytes.toBytes("002");
+ value = Bytes.toBytes("v2");
+ assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable);
+ row = Bytes.toBytes("003");
+ value = Bytes.toBytes("v3");
+ assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable);
+ //Additional wait to make sure no extra bulk load happens
+ Thread.sleep(400);
+ //We have 3 bulk load events (1 initiated on each cluster).
+ //Each event gets 3 counts (the originator cluster, plus the two peers),
+ //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
+ assertEquals(9, BULK_LOADS_COUNT.get());
+ }
+
+ private void assertBulkLoadConditions(byte[] row, byte[] value,
+ HBaseTestingUtility utility, Table...tables) throws Exception {
+ BULK_LOAD_LATCH = new CountDownLatch(3);
+ bulkLoadOnCluster(row, value, utility);
+ assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
+ assertTableHasValue(tables[0], row, value);
+ assertTableHasValue(tables[1], row, value);
+ assertTableHasValue(tables[2], row, value);
+ }
+
+ private void bulkLoadOnCluster(byte[] row, byte[] value,
+ HBaseTestingUtility cluster) throws Exception {
+ String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
+ copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
+ BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
+ bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
+ }
+
+ private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
+ Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
+ cluster.getFileSystem().mkdirs(bulkLoadDir);
+ cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
+ }
+
+ private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
+ Get get = new Get(row);
+ Result result = table.get(get);
+ assertTrue(result.advance());
+ assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
+ }
+
+ private String createHFileForFamilies(byte[] row, byte[] value,
+ Configuration clusterConfig) throws IOException {
+ CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+ cellBuilder.setRow(row)
+ .setFamily(TestReplicationBase.famName)
+ .setQualifier(Bytes.toBytes("1"))
+ .setValue(value)
+ .setType(Cell.Type.Put);
+
+ HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
+ // TODO We need a way to do this without creating files
+ File hFileLocation = testFolder.newFile();
+ FSDataOutputStream out =
+ new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
+ try {
+ hFileFactory.withOutputStream(out);
+ hFileFactory.withFileContext(new HFileContext());
+ HFile.Writer writer = hFileFactory.create();
+ try {
+ writer.append(new KeyValue(cellBuilder.build()));
+ } finally {
+ writer.close();
+ }
+ } finally {
+ out.close();
+ }
+ return hFileLocation.getAbsoluteFile().getAbsolutePath();
+ }
+
+ public static class BulkReplicationTestObserver implements RegionCoprocessor {
+
+ String clusterName;
+ AtomicInteger bulkLoadCounts = new AtomicInteger();
+
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(new RegionObserver() {
+
+ @Override
+ public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+ List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
+ throws IOException {
+ BULK_LOAD_LATCH.countDown();
+ BULK_LOADS_COUNT.incrementAndGet();
+ LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
+ bulkLoadCounts.addAndGet(1));
+ }
+ });
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index f576e75..ad88aa4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -170,7 +170,7 @@ public class TestReplicationBase {
htable1.put(puts);
}
- private static void setupConfig(HBaseTestingUtility util, String znodeParent) {
+ protected static void setupConfig(HBaseTestingUtility util, String znodeParent) {
Configuration conf = util.getConfiguration();
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
// We don't want too many edits per batch sent to the ReplicationEndpoint to trigger