You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/09/21 13:02:23 UTC

[hbase] branch master updated: HBASE-22380 break circle replication when doing bulkload (#566)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 3ae353c  HBASE-22380 break circle replication when doing bulkload (#566)
3ae353c is described below

commit 3ae353cbf45c11b057386cc3cb74d6e9c8430a08
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Sat Sep 21 14:02:18 2019 +0100

    HBASE-22380 break circle replication when doing bulkload (#566)
    
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  12 +-
 .../hbase/shaded/protobuf/RequestConverter.java    |  11 +-
 .../src/main/protobuf/Client.proto                 |   1 +
 hbase-protocol-shaded/src/main/protobuf/WAL.proto  |   1 +
 .../hbase/client/AsyncClusterConnection.java       |  30 +-
 .../hbase/client/AsyncClusterConnectionImpl.java   |  14 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   9 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   8 +-
 .../hbase/regionserver/SecureBulkLoadManager.java  |  10 +-
 .../replication/regionserver/HFileReplicator.java  |   6 +-
 .../replication/regionserver/ReplicationSink.java  |  44 ++-
 .../hadoop/hbase/tool/BulkLoadHFilesTool.java      |   8 +-
 .../regionserver/TestBulkLoadReplication.java      | 316 +++++++++++++++++++++
 .../hbase/replication/TestReplicationBase.java     |   2 +-
 14 files changed, 439 insertions(+), 33 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 353801f..9ccfad8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2566,12 +2566,22 @@ public final class ProtobufUtil {
    * @return The WAL log marker for bulk loads.
    */
   public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
+    ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
+    Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+    return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
+      storeFilesSize, bulkloadSeqId, null);
+  }
+
+  public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
       ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
-      Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+      Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
     BulkLoadDescriptor.Builder desc =
         BulkLoadDescriptor.newBuilder()
         .setTableName(ProtobufUtil.toProtoTableName(tableName))
         .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+    if(clusterIds != null) {
+      desc.addAllClusterIds(clusterIds);
+    }
 
     for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
       WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 352197a..ae3cd3f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -568,7 +568,7 @@ public final class RequestConverter {
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) {
     return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
-        false);
+        false, null);
   }
 
   /**
@@ -583,9 +583,9 @@ public final class RequestConverter {
    * @return a bulk load request
    */
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
-      final List<Pair<byte[], String>> familyPaths,
-      final byte[] regionName, boolean assignSeqNum,
-      final Token<?> userToken, final String bulkToken, boolean copyFiles) {
+      final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
+        final Token<?> userToken, final String bulkToken, boolean copyFiles,
+          List<String> clusterIds) {
     RegionSpecifier region = RequestConverter.buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
 
@@ -623,6 +623,9 @@ public final class RequestConverter {
       request.setBulkToken(bulkToken);
     }
     request.setCopyFile(copyFiles);
+    if (clusterIds != null) {
+      request.addAllClusterIds(clusterIds);
+    }
     return request.build();
   }
 
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 14abb08..07d8d71 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -378,6 +378,7 @@ message BulkLoadHFileRequest {
   optional DelegationToken fs_token = 4;
   optional string bulk_token = 5;
   optional bool copy_file = 6 [default = false];
+  repeated string cluster_ids = 7;
 
   message FamilyPath {
     required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index 35a179c..c103075 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -150,6 +150,7 @@ message BulkLoadDescriptor {
   required bytes encoded_region_name = 2;
   repeated StoreDescriptor stores = 3;
   required int64 bulkload_seq_num = 4;
+  repeated string cluster_ids = 5;
 }
 
 /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 45dc8be..4b4d68f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -75,13 +75,39 @@ public interface AsyncClusterConnection extends AsyncConnection {
   CompletableFuture<String> prepareBulkLoad(TableName tableName);
 
   /**
-   * Securely bulk load a list of HFiles.
-   * @param row used to locate the region
+   * @deprecated Use bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
+   * byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken,
+   *       boolean copyFiles, List<String> clusterIds)
    */
+  @Deprecated
   CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
       byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles);
 
   /**
+   * Securely bulk load a list of HFiles, passing additional list of clusters ids tracking
+   * clusters where the given bulk load has already been processed
+   * (important for bulk loading replication).
+   *
+   * Defined as default here to avoid breaking callers who rely on the bulkLoad version that
+   * does not expect additional clusterIds param.
+   *
+   * @param tableName the target table
+   * @param familyPaths hdfs path for the the table family dirs containg files to be loaded
+   * @param row row key
+   * @param assignSeqNum seq num for the event on WAL
+   * @param userToken user token
+   * @param bulkToken bulk load token
+   * @param copyFiles flag for copying the loaded hfiles
+   * @param clusterIds list of cluster ids where the given bulk load has already been processed.
+   * @return
+   */
+  default CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[],
+    String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken,
+      boolean copyFiles, List<String> clusterIds) {
+    return null;
+  }
+
+  /**
    * Clean up after finishing bulk load, no matter success or not.
    */
   CompletableFuture<Void> cleanupBulkLoad(TableName tableName, String bulkToken);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 328b959..f167770 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -108,14 +108,22 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
 
   @Override
   public CompletableFuture<Boolean> bulkLoad(TableName tableName,
-      List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
-      String bulkToken, boolean copyFiles) {
+    List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
+    String bulkToken, boolean copyFiles) {
+    return bulkLoad(tableName, familyPaths, row, assignSeqNum,
+      userToken, bulkToken, copyFiles, null);
+  }
+
+  @Override
+  public CompletableFuture<Boolean> bulkLoad(TableName tableName,
+    List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
+    String bulkToken, boolean copyFiles, List<String> clusterIds) {
     return callerFactory.<Boolean> single().table(tableName).row(row)
       .action((controller, loc, stub) -> ConnectionUtils
         .<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
           null,
           (rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
-            userToken, bulkToken, copyFiles),
+            userToken, bulkToken, copyFiles, clusterIds),
           (s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
       .call();
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5594327..b27794e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6142,7 +6142,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
       BulkLoadListener bulkLoadListener) throws IOException {
-    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
+    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
   }
 
   /**
@@ -6187,11 +6187,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
    * file about to be bulk loaded
    * @param copyFile always copy hfiles if true
+   * @param  clusterIds ids from clusters that had already handled the given bulkload event.
    * @return Map from family to List of store file paths if successful, null if failed recoverably
    * @throws IOException if failed unrecoverably.
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
-      boolean assignSeqId, BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
+      boolean assignSeqId, BulkLoadListener bulkLoadListener,
+        boolean copyFile, List<String> clusterIds) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6366,8 +6368,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           WALProtos.BulkLoadDescriptor loadDescriptor =
               ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
                   UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles,
-                storeFilesSizes, seqId);
+                  storeFiles, storeFilesSizes, seqId, clusterIds);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index fe1c43a..661311e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2386,6 +2386,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
       final BulkLoadHFileRequest request) throws ServiceException {
     long start = EnvironmentEdgeManager.currentTime();
+    List<String> clusterIds = new ArrayList<>(request.getClusterIdsList());
+    if(clusterIds.contains(this.regionServer.clusterId)){
+      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
+    } else {
+      clusterIds.add(this.regionServer.clusterId);
+    }
     try {
       checkOpen();
       requestCount.increment();
@@ -2410,7 +2416,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       }
       // secure bulk load
       Map<byte[], List<Path>> map =
-        regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request);
+        regionServer.secureBulkLoadManager.secureBulkLoadHFiles(region, request, clusterIds);
       BulkLoadHFileResponse.Builder builder = BulkLoadHFileResponse.newBuilder();
       builder.setLoaded(map != null);
       if (map != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 2dfb43b..ad19473 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -214,7 +214,12 @@ public class SecureBulkLoadManager {
   }
 
   public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
-      final BulkLoadHFileRequest request) throws IOException {
+    final BulkLoadHFileRequest request) throws IOException {
+    return secureBulkLoadHFiles(region, request, null);
+  }
+
+  public Map<byte[], List<Path>> secureBulkLoadHFiles(final HRegion region,
+      final BulkLoadHFileRequest request, List<String> clusterIds) throws IOException {
     final List<Pair<byte[], String>> familyPaths = new ArrayList<>(request.getFamilyPathCount());
     for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
       familyPaths.add(new Pair<>(el.getFamily().toByteArray(), el.getPath()));
@@ -290,7 +295,8 @@ public class SecureBulkLoadManager {
             //We call bulkLoadHFiles as requesting user
             //To enable access prior to staging
             return region.bulkLoadHFiles(familyPaths, true,
-                new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
+                new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
+              clusterIds);
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index ed0ea15..169f747 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -91,17 +91,19 @@ public class HFileReplicator {
   private ThreadPoolExecutor exec;
   private int maxCopyThreads;
   private int copiesPerThread;
+  private List<String> sourceClusterIds;
 
   public HFileReplicator(Configuration sourceClusterConf,
       String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
       Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
-      AsyncClusterConnection connection) throws IOException {
+      AsyncClusterConnection connection, List<String> sourceClusterIds) throws IOException {
     this.sourceClusterConf = sourceClusterConf;
     this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
     this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
     this.bulkLoadHFileMap = tableQueueMap;
     this.conf = conf;
     this.connection = connection;
+    this.sourceClusterIds = sourceClusterIds;
 
     userProvider = UserProvider.instantiate(conf);
     fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
@@ -156,6 +158,8 @@ public class HFileReplicator {
     BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
     // Set the staging directory which will be used by BulkLoadHFilesTool for loading the data
     loader.setBulkToken(stagingDir.toString());
+    //updating list of cluster ids where this bulkload event has already been processed
+    loader.setClusterIds(sourceClusterIds);
     for (int count = 0; !queue.isEmpty(); count++) {
       if (count != 0) {
         LOG.warn("Error occurred while replicating HFiles, retry attempt " + count + " with " +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index e30e637..6c3f70c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -175,9 +175,7 @@ public class ReplicationSink {
       // invocation of this method per table and cluster id.
       Map<TableName, Map<List<UUID>, List<Row>>> rowMap = new TreeMap<>();
 
-      // Map of table name Vs list of pair of family and list of hfile paths from its namespace
-      Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
-
+      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
       for (WALEntry entry : entries) {
         TableName table = TableName.valueOf(entry.getKey().getTableName().toByteArray());
         if (this.walEntrySinkFilter != null) {
@@ -204,10 +202,19 @@ public class ReplicationSink {
           Cell cell = cells.current();
           // Handle bulk load hfiles replication
           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+            if(bulkLoadsPerClusters == null) {
+              bulkLoadsPerClusters = new HashMap<>();
+            }
+            // Map of table name Vs list of pair of family and list of
+            // hfile paths from its namespace
+            Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+              bulkLoadsPerClusters.get(bld.getClusterIdsList());
             if (bulkLoadHFileMap == null) {
               bulkLoadHFileMap = new HashMap<>();
+              bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
             }
-            buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
+            buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
           } else {
             // Handle wal replication
             if (isNewRowOrType(previousCell, cell)) {
@@ -245,14 +252,26 @@ public class ReplicationSink {
         LOG.debug("Finished replicating mutations.");
       }
 
-      if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
-        LOG.debug("Started replicating bulk loaded data.");
-        HFileReplicator hFileReplicator =
-            new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
+      if(bulkLoadsPerClusters != null) {
+        for (Entry<List<String>, Map<String, List<Pair<byte[], List<String>>>>> entry :
+            bulkLoadsPerClusters.entrySet()) {
+          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
+          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Started replicating bulk loaded data from cluster ids: {}.",
+                entry.getKey().toString());
+            }
+            HFileReplicator hFileReplicator =
+              new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
                 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
-                getConnection());
-        hFileReplicator.replicate();
-        LOG.debug("Finished replicating bulk loaded data.");
+                getConnection(), entry.getKey());
+            hFileReplicator.replicate();
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Finished replicating bulk loaded data from cluster id: {}",
+                entry.getKey().toString());
+            }
+          }
+        }
       }
 
       int size = entries.size();
@@ -267,8 +286,7 @@ public class ReplicationSink {
 
   private void buildBulkLoadHFileMap(
       final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
-      Cell cell) throws IOException {
-    BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+      BulkLoadDescriptor bld) throws IOException {
     List<StoreDescriptor> storesList = bld.getStoresList();
     int storesSize = storesList.size();
     for (int j = 0; j < storesSize; j++) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 6efb4b5..0e2e029 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -131,6 +131,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
   private final AtomicInteger numRetries = new AtomicInteger(0);
   private String bulkToken;
 
+  private List<String> clusterIds = new ArrayList<>();
+
   public BulkLoadHFilesTool(Configuration conf) {
     // make a copy, just to be sure we're not overriding someone else's config
     super(new Configuration(conf));
@@ -377,7 +379,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
           .collect(Collectors.toList());
       CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
       FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
-        fsDelegationToken.getUserToken(), bulkToken, copyFiles), (loaded, error) -> {
+        fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> {
           if (error != null) {
             LOG.error("Encountered unrecoverable error from region server", error);
             if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
@@ -997,6 +999,10 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
     this.bulkToken = bulkToken;
   }
 
+  public void setClusterIds(List<String> clusterIds) {
+    this.clusterIds = clusterIds;
+  }
+
   private void usage() {
     System.err.println("Usage: " + "bin/hbase completebulkload [OPTIONS] "
         + "</PATH/TO/HFILEOUTPUTFORMAT-OUTPUT> <TABLENAME>\n"
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
new file mode 100644
index 0000000..7a49989
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for bulk load replication. Defines three clusters, with the following
+ * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
+ * 2 and 3).
+ *
+ * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
+ * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
+ * topology all these bulk loads should get replicated only once on each peer. To assert this,
+ * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
+ * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
+ * we are not entering the infinite loop condition addressed by HBASE-22380.
+ */
+@Category({ ReplicationTests.class, MediumTests.class})
+public class TestBulkLoadReplication extends TestReplicationBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadReplication.class);
+
+  protected static final Logger LOG =
+    LoggerFactory.getLogger(TestBulkLoadReplication.class);
+
+  private static final String PEER1_CLUSTER_ID = "peer1";
+  private static final String PEER2_CLUSTER_ID = "peer2";
+  private static final String PEER3_CLUSTER_ID = "peer3";
+
+  private static final String PEER_ID1 = "1";
+  private static final String PEER_ID3 = "3";
+
+  private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+  private static CountDownLatch BULK_LOAD_LATCH;
+
+  private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
+  private static final Configuration CONF3 = UTIL3.getConfiguration();
+
+  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
+
+  private static Table htable3;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @ClassRule
+  public static TemporaryFolder testFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setupBulkLoadConfigsForCluster(CONF1, PEER1_CLUSTER_ID);
+    setupBulkLoadConfigsForCluster(CONF2, PEER2_CLUSTER_ID);
+    setupBulkLoadConfigsForCluster(CONF3, PEER3_CLUSTER_ID);
+    setupConfig(UTIL3, "/3");
+    TestReplicationBase.setUpBeforeClass();
+    startThirdCluster();
+  }
+
+  private static void startThirdCluster() throws Exception {
+    LOG.info("Setup Zk to same one from UTIL1 and UTIL2");
+    UTIL3.setZkCluster(UTIL1.getZkCluster());
+    UTIL3.startMiniCluster(NUM_SLAVES1);
+
+    TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+        .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
+
+    Connection connection3 = ConnectionFactory.createConnection(CONF3);
+    try (Admin admin3 = connection3.getAdmin()) {
+      admin3.createTable(table, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+    }
+    UTIL3.waitUntilAllRegionsAssigned(tableName);
+    htable3 = connection3.getTable(tableName);
+  }
+
+  @Before
+  @Override
+  public void setUpBase() throws Exception {
+    //"super.setUpBase()" already sets replication from 1->2,
+    //then on the subsequent lines, sets 2->1, 2->3 and 3->2.
+    //So we have following topology: "1 <-> 2 <->3"
+    super.setUpBase();
+    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
+    ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
+    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(UTIL3);
+    //adds cluster1 as a remote peer on cluster2
+    UTIL2.getAdmin().addReplicationPeer(PEER_ID1, peer1Config);
+    //adds cluster3 as a remote peer on cluster2
+    UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
+    //adds cluster2 as a remote peer on cluster3
+    UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
+    setupCoprocessor(UTIL1, "cluster1");
+    setupCoprocessor(UTIL2, "cluster2");
+    setupCoprocessor(UTIL3, "cluster3");
+  }
+
+  private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
+    return ReplicationPeerConfig.newBuilder()
+      .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
+  }
+
+  private void setupCoprocessor(HBaseTestingUtility cluster, String name){
+    cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
+      try {
+        r.getCoprocessorHost()
+          .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
+            cluster.getConfiguration());
+        TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost()
+          .findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+        cp.clusterName = cluster.getClusterKey();
+      } catch (Exception e){
+        LOG.error(e.getMessage(), e);
+      }
+    });
+  }
+
+  @After
+  @Override
+  public void tearDownBase() throws Exception {
+    super.tearDownBase();
+    UTIL2.getAdmin().removeReplicationPeer(PEER_ID1);
+    UTIL2.getAdmin().removeReplicationPeer(PEER_ID3);
+    UTIL3.getAdmin().removeReplicationPeer(PEER_ID2);
+  }
+
+  private static void setupBulkLoadConfigsForCluster(Configuration config,
+    String clusterReplicationId) throws Exception {
+    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
+    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
+    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
+      + "/hbase-site.xml");
+    config.writeXml(new FileOutputStream(sourceConfigFile));
+    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
+  }
+
+  @Test
+  public void testBulkLoadReplicationActiveActive() throws Exception {
+    Table peer1TestTable = UTIL1.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer2TestTable = UTIL2.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer3TestTable = UTIL3.getConnection().getTable(TestReplicationBase.tableName);
+    byte[] row = Bytes.toBytes("001");
+    byte[] value = Bytes.toBytes("v1");
+    assertBulkLoadConditions(row, value, UTIL1, peer1TestTable, peer2TestTable, peer3TestTable);
+    row = Bytes.toBytes("002");
+    value = Bytes.toBytes("v2");
+    assertBulkLoadConditions(row, value, UTIL2, peer1TestTable, peer2TestTable, peer3TestTable);
+    row = Bytes.toBytes("003");
+    value = Bytes.toBytes("v3");
+    assertBulkLoadConditions(row, value, UTIL3, peer1TestTable, peer2TestTable, peer3TestTable);
+    //Additional wait to make sure no extra bulk load happens
+    Thread.sleep(400);
+    //We have 3 bulk load events (1 initiated on each cluster).
+    //Each event gets 3 counts (the originator cluster, plus the two peers),
+    //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
+    assertEquals(9, BULK_LOADS_COUNT.get());
+  }
+
+  private void assertBulkLoadConditions(byte[] row, byte[] value,
+      HBaseTestingUtility utility, Table...tables) throws Exception {
+    BULK_LOAD_LATCH = new CountDownLatch(3);
+    bulkLoadOnCluster(row, value, utility);
+    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
+    assertTableHasValue(tables[0], row, value);
+    assertTableHasValue(tables[1], row, value);
+    assertTableHasValue(tables[2], row, value);
+  }
+
+  private void bulkLoadOnCluster(byte[] row, byte[] value,
+      HBaseTestingUtility cluster) throws Exception {
+    String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
+    copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
+    BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
+    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
+  }
+
+  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
+    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
+    cluster.getFileSystem().mkdirs(bulkLoadDir);
+    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
+  }
+
+  private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
+    Get get = new Get(row);
+    Result result = table.get(get);
+    assertTrue(result.advance());
+    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
+  }
+
+  private String createHFileForFamilies(byte[] row, byte[] value,
+      Configuration clusterConfig) throws IOException {
+    CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
+    cellBuilder.setRow(row)
+      .setFamily(TestReplicationBase.famName)
+      .setQualifier(Bytes.toBytes("1"))
+      .setValue(value)
+      .setType(Cell.Type.Put);
+
+    HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
+    // TODO We need a way to do this without creating files
+    File hFileLocation = testFolder.newFile();
+    FSDataOutputStream out =
+      new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
+    try {
+      hFileFactory.withOutputStream(out);
+      hFileFactory.withFileContext(new HFileContext());
+      HFile.Writer writer = hFileFactory.create();
+      try {
+        writer.append(new KeyValue(cellBuilder.build()));
+      } finally {
+        writer.close();
+      }
+    } finally {
+      out.close();
+    }
+    return hFileLocation.getAbsoluteFile().getAbsolutePath();
+  }
+
+  public static class BulkReplicationTestObserver implements RegionCoprocessor {
+
+    String clusterName;
+    AtomicInteger bulkLoadCounts = new AtomicInteger();
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(new RegionObserver() {
+
+        @Override
+        public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+          List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
+            throws IOException {
+          BULK_LOAD_LATCH.countDown();
+          BULK_LOADS_COUNT.incrementAndGet();
+          LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
+            bulkLoadCounts.addAndGet(1));
+        }
+      });
+    }
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index f576e75..ad88aa4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -170,7 +170,7 @@ public class TestReplicationBase {
     htable1.put(puts);
   }
 
-  private static void setupConfig(HBaseTestingUtility util, String znodeParent) {
+  protected static void setupConfig(HBaseTestingUtility util, String znodeParent) {
     Configuration conf = util.getConfiguration();
     conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, znodeParent);
     // We don't want too many edits per batch sent to the ReplicationEndpoint to trigger