You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/10/02 20:20:44 UTC

[hbase] branch branch-1.4 updated: HBASE-23101 Backport HBASE-22380 to branch-1

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.4 by this push:
     new f574982  HBASE-23101 Backport HBASE-22380 to branch-1
f574982 is described below

commit f574982f040d71d75ee53c424c97dccb439d760e
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Wed Oct 2 17:49:14 2019 +0100

    HBASE-23101 Backport HBASE-22380 to branch-1
    
    Fixes #680
    
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    
    Conflicts:
    	hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
---
 .../client/coprocessor/SecureBulkLoadClient.java   |  21 +-
 .../apache/hadoop/hbase/protobuf/ProtobufUtil.java |  60 +++-
 .../hadoop/hbase/protobuf/RequestConverter.java    |  23 +-
 .../hbase/protobuf/generated/ClientProtos.java     | 307 +++++++++++++++++----
 .../protobuf/generated/SecureBulkLoadProtos.java   | 233 ++++++++++++++--
 .../hadoop/hbase/protobuf/generated/WALProtos.java | 220 ++++++++++++++-
 hbase-protocol/src/main/protobuf/Client.proto      |   1 +
 .../src/main/protobuf/SecureBulkLoad.proto         |   1 +
 hbase-protocol/src/main/protobuf/WAL.proto         |   1 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java     |   9 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   4 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |   2 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   8 +-
 .../apache/hadoop/hbase/regionserver/Region.java   |   5 +-
 .../replication/regionserver/HFileReplicator.java  |   5 +-
 .../replication/regionserver/ReplicationSink.java  |  46 ++-
 .../security/access/SecureBulkLoadEndpoint.java    |   4 +-
 .../hadoop/hbase/regionserver/TestBulkLoad.java    |  18 +-
 .../regionserver/TestBulkLoadReplication.java      | 299 ++++++++++++++++++++
 .../regionserver/TestHRegionReplayEvents.java      |   2 +-
 .../hbase/regionserver/wal/TestWALReplay.java      |   4 +-
 21 files changed, 1134 insertions(+), 139 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java
index 05db8d4..bfbd8dd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/coprocessor/SecureBulkLoadClient.java
@@ -117,10 +117,24 @@ public class SecureBulkLoadClient {
     }
   }
 
+  /**
+   * @deprecated
+   * @param familyPaths
+   * @param userToken
+   * @param bulkToken
+   * @param startRow
+   * @return
+   * @throws IOException
+   */
   public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
-                         final Token<?> userToken,
-                         final String bulkToken,
-                         final byte[] startRow) throws IOException {
+    final Token<?> userToken, final String bulkToken,
+    final byte[] startRow) throws IOException {
+    return this.bulkLoadHFiles(familyPaths, userToken, bulkToken, startRow, null);
+  }
+
+  public boolean bulkLoadHFiles(final List<Pair<byte[], String>> familyPaths,
+    final Token<?> userToken, final String bulkToken,
+    final byte[] startRow, List<String> clusterIds) throws IOException {
     // we never want to send a batch of HFiles to all regions, thus cannot call
     // HTable#coprocessorService methods that take start and end rowkeys; see HBASE-9639
     try {
@@ -151,6 +165,7 @@ public class SecureBulkLoadClient {
           SecureBulkLoadProtos.SecureBulkLoadHFilesRequest.newBuilder()
             .setFsToken(protoDT)
             .addAllFamilyPath(protoFamilyPaths)
+            .addAllClusterIds(clusterIds != null ? clusterIds : new ArrayList<String>())
             .setBulkToken(bulkToken).build();
 
       ServerRpcController controller = new ServerRpcController();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
index c62064e..77afa1c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ProtobufUtil.java
@@ -1807,12 +1807,31 @@ public final class ProtobufUtil {
    * @param assignSeqNum
    * @return true if all are loaded
    * @throws IOException
+   * @deprecated use bulkLoadHFile(final ClientService.BlockingInterface client,
+   * final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
+   * List<String> clusterIds) instead.
    */
   public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
       final List<Pair<byte[], String>> familyPaths,
       final byte[] regionName, boolean assignSeqNum) throws IOException {
+    return bulkLoadHFile(client, familyPaths, regionName, assignSeqNum, null);
+  }
+
+  /**
+   * A helper to bulk load a list of HFiles using client protocol.
+   *
+   * @param client
+   * @param familyPaths
+   * @param regionName
+   * @param assignSeqNum
+   * @return true if all are loaded
+   * @throws IOException
+   */
+  public static boolean bulkLoadHFile(final ClientService.BlockingInterface client,
+    final List<Pair<byte[], String>> familyPaths,
+    final byte[] regionName, boolean assignSeqNum, List<String> clusterIds) throws IOException {
     BulkLoadHFileRequest request =
-      RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum);
+      RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, clusterIds);
     try {
       BulkLoadHFileResponse response =
         client.bulkLoadHFile(null, request);
@@ -3346,6 +3365,10 @@ public final class ProtobufUtil {
    * Generates a marker for the WAL so that we propagate the notion of a bulk region load
    * throughout the WAL.
    *
+   * @deprecated use toBulkLoadDescriptor(TableName tableName, ByteString encodedRegionName,
+   * Map<byte[], List<Path>> storeFiles, Map<String, Long> storeFilesSize, long bulkloadSeqId,
+   * List<String> clusterIds) instead.
+   *
    * @param tableName         The tableName into which the bulk load is being imported into.
    * @param encodedRegionName Encoded region name of the region which is being bulk loaded.
    * @param storeFiles        A set of store files of a column family are bulk loaded.
@@ -3355,17 +3378,42 @@ public final class ProtobufUtil {
    * @return The WAL log marker for bulk loads.
    */
   public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
-      ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
-      Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+    ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
+    Map<String, Long> storeFilesSize, long bulkloadSeqId) {
+    return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
+      storeFilesSize, bulkloadSeqId, null);
+  }
+
+  /**
+   * Generates a marker for the WAL so that we propagate the notion of a bulk region load
+   * throughout the WAL, keeping track of clusters who already applied the bulk event via
+   * the passed clusterIds parameter.
+   *
+   * @param tableName         The tableName into which the bulk load is being imported into.
+   * @param encodedRegionName Encoded region name of the region which is being bulk loaded.
+   * @param storeFiles        A set of store files of a column family are bulk loaded.
+   * @param storeFilesSize  Map of store files and their lengths
+   * @param bulkloadSeqId     sequence ID (by a force flush) used to create bulk load hfile name
+   * @param clusterIds      The list of cluster Ids with the clusters where the bulk even had
+   *                        already been processed.
+   *
+   * @return The WAL log marker for bulk loads.
+   */
+  public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
+    ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
+    Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
     BulkLoadDescriptor.Builder desc =
-        BulkLoadDescriptor.newBuilder()
+      BulkLoadDescriptor.newBuilder()
         .setTableName(ProtobufUtil.toProtoTableName(tableName))
         .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+    if(clusterIds != null) {
+      desc.addAllClusterIds(clusterIds);
+    }
 
     for (Map.Entry<byte[], List<Path>> entry : storeFiles.entrySet()) {
       WALProtos.StoreDescriptor.Builder builder = StoreDescriptor.newBuilder()
-          .setFamilyName(ByteStringer.wrap(entry.getKey()))
-          .setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region
+        .setFamilyName(ByteStringer.wrap(entry.getKey()))
+        .setStoreHomeDir(Bytes.toString(entry.getKey())); // relative to region
       for (Path path : entry.getValue()) {
         String name = path.getName();
         builder.addStoreFile(name);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index 79cbb01..a31ef37 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -558,14 +558,32 @@ public final class RequestConverter {
   /**
    * Create a protocol buffer bulk load request
    *
+   * @deprecated use buildBulkLoadHFileRequest(final List<Pair<byte[], String>> familyPaths,
+   * final byte[] regionName, boolean assignSeqNum, List<String> clusterIds)
+   *
    * @param familyPaths
    * @param regionName
    * @param assignSeqNum
    * @return a bulk load request
    */
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
+    final List<Pair<byte[], String>> familyPaths,
+    final byte[] regionName, boolean assignSeqNum) {
+    return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, null);
+  }
+
+  /**
+   * Create a protocol buffer bulk load request
+   *
+   * @param familyPaths
+   * @param regionName
+   * @param assignSeqNum
+   * @param clusterIds
+   * @return a bulk load request
+   */
+  public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
       final List<Pair<byte[], String>> familyPaths,
-      final byte[] regionName, boolean assignSeqNum) {
+      final byte[] regionName, boolean assignSeqNum, List<String> clusterIds) {
     BulkLoadHFileRequest.Builder builder = BulkLoadHFileRequest.newBuilder();
     RegionSpecifier region = buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
@@ -576,6 +594,9 @@ public final class RequestConverter {
       familyPathBuilder.setPath(familyPath.getSecond());
       builder.addFamilyPath(familyPathBuilder.build());
     }
+    if(clusterIds!=null) {
+      builder.addAllClusterIds(clusterIds);
+    }
     builder.setAssignSeqNum(assignSeqNum);
     return builder.build();
   }
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 35fddc2..3c905e8 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -22843,6 +22843,26 @@ public final class ClientProtos {
      * <code>optional bool assign_seq_num = 3;</code>
      */
     boolean getAssignSeqNum();
+
+    // repeated string cluster_ids = 4;
+    /**
+     * <code>repeated string cluster_ids = 4;</code>
+     */
+    java.util.List<java.lang.String>
+    getClusterIdsList();
+    /**
+     * <code>repeated string cluster_ids = 4;</code>
+     */
+    int getClusterIdsCount();
+    /**
+     * <code>repeated string cluster_ids = 4;</code>
+     */
+    java.lang.String getClusterIds(int index);
+    /**
+     * <code>repeated string cluster_ids = 4;</code>
+     */
+    com.google.protobuf.ByteString
+        getClusterIdsBytes(int index);
   }
   /**
    * Protobuf type {@code hbase.pb.BulkLoadHFileRequest}
@@ -22927,6 +22947,14 @@ public final class ClientProtos {
               assignSeqNum_ = input.readBool();
               break;
             }
+            case 34: {
+              if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+                clusterIds_ = new com.google.protobuf.LazyStringArrayList();
+                mutable_bitField0_ |= 0x00000008;
+              }
+              clusterIds_.add(input.readBytes());
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -22938,6 +22966,9 @@ public final class ClientProtos {
         if (((mutable_bitField0_ & 0x00000002) == 0x00000002)) {
           familyPath_ = java.util.Collections.unmodifiableList(familyPath_);
         }
+        if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) {
+          clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_);
+        }
         this.unknownFields = unknownFields.build();
         makeExtensionsImmutable();
       }
@@ -23662,10 +23693,41 @@ public final class ClientProtos {
       return assignSeqNum_;
     }
 
+    // repeated string cluster_ids = 4;
+    public static final int CLUSTER_IDS_FIELD_NUMBER = 4;
+    private com.google.protobuf.LazyStringList clusterIds_;
+    /**
+     * <code>repeated string cluster_ids = 4;</code>
+     */
+    public java.util.List<java.lang.String>
+        getClusterIdsList() {
+      return clusterIds_;
+    }
+    /**
+     * <code>repeated string cluster_ids = 4;</code>
+     */
+    public int getClusterIdsCount() {
+      return clusterIds_.size();
+    }
+    /**
+     * <code>repeated string cluster_ids = 4;</code>
+     */
+    public java.lang.String getClusterIds(int index) {
+      return clusterIds_.get(index);
+    }
+    /**
+     * <code>repeated string cluster_ids = 4;</code>
+     */
+    public com.google.protobuf.ByteString
+        getClusterIdsBytes(int index) {
+      return clusterIds_.getByteString(index);
+    }
+
     private void initFields() {
       region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
       familyPath_ = java.util.Collections.emptyList();
       assignSeqNum_ = false;
+      clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -23702,6 +23764,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000002) == 0x00000002)) {
         output.writeBool(3, assignSeqNum_);
       }
+      for (int i = 0; i < clusterIds_.size(); i++) {
+        output.writeBytes(4, clusterIds_.getByteString(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -23723,6 +23788,15 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(3, assignSeqNum_);
       }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < clusterIds_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(clusterIds_.getByteString(i));
+        }
+        size += dataSize;
+        size += 1 * getClusterIdsList().size();
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -23758,6 +23832,8 @@ public final class ClientProtos {
         result = result && (getAssignSeqNum()
             == other.getAssignSeqNum());
       }
+      result = result && getClusterIdsList()
+          .equals(other.getClusterIdsList());
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -23783,6 +23859,10 @@ public final class ClientProtos {
         hash = (37 * hash) + ASSIGN_SEQ_NUM_FIELD_NUMBER;
         hash = (53 * hash) + hashBoolean(getAssignSeqNum());
       }
+      if (getClusterIdsCount() > 0) {
+        hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
+        hash = (53 * hash) + getClusterIdsList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -23914,6 +23994,8 @@ public final class ClientProtos {
         }
         assignSeqNum_ = false;
         bitField0_ = (bitField0_ & ~0x00000004);
+        clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -23963,6 +24045,12 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000002;
         }
         result.assignSeqNum_ = assignSeqNum_;
+        if (((bitField0_ & 0x00000008) == 0x00000008)) {
+          clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(
+              clusterIds_);
+          bitField0_ = (bitField0_ & ~0x00000008);
+        }
+        result.clusterIds_ = clusterIds_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -24011,6 +24099,16 @@ public final class ClientProtos {
         if (other.hasAssignSeqNum()) {
           setAssignSeqNum(other.getAssignSeqNum());
         }
+        if (!other.clusterIds_.isEmpty()) {
+          if (clusterIds_.isEmpty()) {
+            clusterIds_ = other.clusterIds_;
+            bitField0_ = (bitField0_ & ~0x00000008);
+          } else {
+            ensureClusterIdsIsMutable();
+            clusterIds_.addAll(other.clusterIds_);
+          }
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -24442,6 +24540,99 @@ public final class ClientProtos {
         return this;
       }
 
+      // repeated string cluster_ids = 4;
+      private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      private void ensureClusterIdsIsMutable() {
+        if (!((bitField0_ & 0x00000008) == 0x00000008)) {
+          clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_);
+          bitField0_ |= 0x00000008;
+         }
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public java.util.List<java.lang.String>
+          getClusterIdsList() {
+        return java.util.Collections.unmodifiableList(clusterIds_);
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public int getClusterIdsCount() {
+        return clusterIds_.size();
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public java.lang.String getClusterIds(int index) {
+        return clusterIds_.get(index);
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public com.google.protobuf.ByteString
+          getClusterIdsBytes(int index) {
+        return clusterIds_.getByteString(index);
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public Builder setClusterIds(
+          int index, java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public Builder addClusterIds(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public Builder addAllClusterIds(
+          java.lang.Iterable<java.lang.String> values) {
+        ensureClusterIdsIsMutable();
+        super.addAll(values, clusterIds_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public Builder clearClusterIds() {
+        clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000008);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 4;</code>
+       */
+      public Builder addClusterIdsBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.add(value);
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest)
     }
 
@@ -37378,66 +37569,66 @@ public final class ClientProtos {
       "(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014scan_me" +
       "trics\030\n \001(\0132\025.hbase.pb.ScanMetrics\022\032\n\017mv" +
       "cc_read_point\030\013 \001(\004:\0010\022 \n\006cursor\030\014 \001(\0132\020" +
-      ".hbase.pb.Cursor\"\305\001\n\024BulkLoadHFileReques" +
+      ".hbase.pb.Cursor\"\332\001\n\024BulkLoadHFileReques" +
       "t\022)\n\006region\030\001 \002(\0132\031.hbase.pb.RegionSpeci" +
       "fier\022>\n\013family_path\030\002 \003(\0132).hbase.pb.Bul" +
       "kLoadHFileRequest.FamilyPath\022\026\n\016assign_s" +
-      "eq_num\030\003 \001(\010\032*\n\nFamilyPath\022\016\n\006family\030\001 \002" +
-      "(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileRespons" +
-      "e\022\016\n\006loaded\030\001 \002(\010\"a\n\026CoprocessorServiceC",
-      "all\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023" +
-      "\n\013method_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030" +
-      "CoprocessorServiceResult\022&\n\005value\030\001 \001(\0132" +
-      "\027.hbase.pb.NameBytesPair\"v\n\031CoprocessorS" +
-      "erviceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb" +
-      ".RegionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.p" +
-      "b.CoprocessorServiceCall\"o\n\032CoprocessorS" +
-      "erviceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.p" +
-      "b.RegionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase" +
-      ".pb.NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 ",
-      "\001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.Mutatio" +
-      "nProto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014se" +
-      "rvice_call\030\004 \001(\0132 .hbase.pb.CoprocessorS" +
-      "erviceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002" +
-      "(\0132\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030" +
-      "\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"" +
-      "c\n\017RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005" +
-      ":\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compact" +
-      "ionPressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadS" +
-      "tats\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSp",
-      "ecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionL" +
-      "oadStats\"\336\001\n\021ResultOrException\022\r\n\005index\030" +
-      "\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022" +
-      "*\n\texception\030\003 \001(\0132\027.hbase.pb.NameBytesP" +
-      "air\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.C" +
-      "oprocessorServiceResult\0220\n\tloadStats\030\005 \001" +
-      "(\0132\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Reg" +
-      "ionActionResult\0226\n\021resultOrException\030\001 \003" +
-      "(\0132\033.hbase.pb.ResultOrException\022*\n\texcep" +
-      "tion\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014M",
-      "ultiRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbas" +
-      "e.pb.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n" +
-      "\tcondition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001" +
-      "\n\rMultiResponse\0228\n\022regionActionResult\030\001 " +
-      "\003(\0132\034.hbase.pb.RegionActionResult\022\021\n\tpro" +
-      "cessed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036" +
-      ".hbase.pb.MultiRegionLoadStats*\'\n\013Consis" +
-      "tency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\203\004\n\rClie" +
-      "ntService\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025" +
-      ".hbase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.",
-      "pb.MutateRequest\032\030.hbase.pb.MutateRespon" +
-      "se\0225\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbas" +
-      "e.pb.ScanResponse\022P\n\rBulkLoadHFile\022\036.hba" +
-      "se.pb.BulkLoadHFileRequest\032\037.hbase.pb.Bu" +
-      "lkLoadHFileResponse\022X\n\013ExecService\022#.hba" +
-      "se.pb.CoprocessorServiceRequest\032$.hbase." +
-      "pb.CoprocessorServiceResponse\022d\n\027ExecReg" +
-      "ionServerService\022#.hbase.pb.CoprocessorS" +
-      "erviceRequest\032$.hbase.pb.CoprocessorServ" +
-      "iceResponse\0228\n\005Multi\022\026.hbase.pb.MultiReq",
-      "uest\032\027.hbase.pb.MultiResponseBB\n*org.apa" +
-      "che.hadoop.hbase.protobuf.generatedB\014Cli" +
-      "entProtosH\001\210\001\001\240\001\001"
+      "eq_num\030\003 \001(\010\022\023\n\013cluster_ids\030\004 \003(\t\032*\n\nFam" +
+      "ilyPath\022\016\n\006family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n" +
+      "\025BulkLoadHFileResponse\022\016\n\006loaded\030\001 \002(\010\"a",
+      "\n\026CoprocessorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n" +
+      "\014service_name\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t" +
+      "\022\017\n\007request\030\004 \002(\014\"B\n\030CoprocessorServiceR" +
+      "esult\022&\n\005value\030\001 \001(\0132\027.hbase.pb.NameByte" +
+      "sPair\"v\n\031CoprocessorServiceRequest\022)\n\006re" +
+      "gion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022.\n" +
+      "\004call\030\002 \002(\0132 .hbase.pb.CoprocessorServic" +
+      "eCall\"o\n\032CoprocessorServiceResponse\022)\n\006r" +
+      "egion\030\001 \002(\0132\031.hbase.pb.RegionSpecifier\022&" +
+      "\n\005value\030\002 \002(\0132\027.hbase.pb.NameBytesPair\"\226",
+      "\001\n\006Action\022\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001" +
+      "(\0132\027.hbase.pb.MutationProto\022\032\n\003get\030\003 \001(\013" +
+      "2\r.hbase.pb.Get\0226\n\014service_call\030\004 \001(\0132 ." +
+      "hbase.pb.CoprocessorServiceCall\"k\n\014Regio" +
+      "nAction\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Regio" +
+      "nSpecifier\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003" +
+      "(\0132\020.hbase.pb.Action\"c\n\017RegionLoadStats\022" +
+      "\027\n\014memstoreLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupanc" +
+      "y\030\002 \001(\005:\0010\022\035\n\022compactionPressure\030\003 \001(\005:\001" +
+      "0\"j\n\024MultiRegionLoadStats\022)\n\006region\030\001 \003(",
+      "\0132\031.hbase.pb.RegionSpecifier\022\'\n\004stat\030\002 \003" +
+      "(\0132\031.hbase.pb.RegionLoadStats\"\336\001\n\021Result" +
+      "OrException\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001" +
+      "(\0132\020.hbase.pb.Result\022*\n\texception\030\003 \001(\0132" +
+      "\027.hbase.pb.NameBytesPair\022:\n\016service_resu" +
+      "lt\030\004 \001(\0132\".hbase.pb.CoprocessorServiceRe" +
+      "sult\0220\n\tloadStats\030\005 \001(\0132\031.hbase.pb.Regio" +
+      "nLoadStatsB\002\030\001\"x\n\022RegionActionResult\0226\n\021" +
+      "resultOrException\030\001 \003(\0132\033.hbase.pb.Resul" +
+      "tOrException\022*\n\texception\030\002 \001(\0132\027.hbase.",
+      "pb.NameBytesPair\"x\n\014MultiRequest\022,\n\014regi" +
+      "onAction\030\001 \003(\0132\026.hbase.pb.RegionAction\022\022" +
+      "\n\nnonceGroup\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023." +
+      "hbase.pb.Condition\"\226\001\n\rMultiResponse\0228\n\022" +
+      "regionActionResult\030\001 \003(\0132\034.hbase.pb.Regi" +
+      "onActionResult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020reg" +
+      "ionStatistics\030\003 \001(\0132\036.hbase.pb.MultiRegi" +
+      "onLoadStats*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014" +
+      "\n\010TIMELINE\020\0012\203\004\n\rClientService\0222\n\003Get\022\024." +
+      "hbase.pb.GetRequest\032\025.hbase.pb.GetRespon",
+      "se\022;\n\006Mutate\022\027.hbase.pb.MutateRequest\032\030." +
+      "hbase.pb.MutateResponse\0225\n\004Scan\022\025.hbase." +
+      "pb.ScanRequest\032\026.hbase.pb.ScanResponse\022P" +
+      "\n\rBulkLoadHFile\022\036.hbase.pb.BulkLoadHFile" +
+      "Request\032\037.hbase.pb.BulkLoadHFileResponse" +
+      "\022X\n\013ExecService\022#.hbase.pb.CoprocessorSe" +
+      "rviceRequest\032$.hbase.pb.CoprocessorServi" +
+      "ceResponse\022d\n\027ExecRegionServerService\022#." +
+      "hbase.pb.CoprocessorServiceRequest\032$.hba" +
+      "se.pb.CoprocessorServiceResponse\0228\n\005Mult",
+      "i\022\026.hbase.pb.MultiRequest\032\027.hbase.pb.Mul" +
+      "tiResponseBB\n*org.apache.hadoop.hbase.pr" +
+      "otobuf.generatedB\014ClientProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -37551,7 +37742,7 @@ public final class ClientProtos {
           internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_BulkLoadHFileRequest_descriptor,
-              new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", });
+              new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "ClusterIds", });
           internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor =
             internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0);
           internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/SecureBulkLoadProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/SecureBulkLoadProtos.java
index 8521ba8..7891480 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/SecureBulkLoadProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/SecureBulkLoadProtos.java
@@ -74,6 +74,26 @@ public final class SecureBulkLoadProtos {
      */
     com.google.protobuf.ByteString
         getBulkTokenBytes();
+
+    // repeated string cluster_ids = 5;
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    java.util.List<java.lang.String>
+    getClusterIdsList();
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    int getClusterIdsCount();
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    java.lang.String getClusterIds(int index);
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    com.google.protobuf.ByteString
+        getClusterIdsBytes(int index);
   }
   /**
    * Protobuf type {@code hbase.pb.SecureBulkLoadHFilesRequest}
@@ -157,6 +177,14 @@ public final class SecureBulkLoadProtos {
               bulkToken_ = input.readBytes();
               break;
             }
+            case 42: {
+              if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
+                clusterIds_ = new com.google.protobuf.LazyStringArrayList();
+                mutable_bitField0_ |= 0x00000010;
+              }
+              clusterIds_.add(input.readBytes());
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -168,6 +196,9 @@ public final class SecureBulkLoadProtos {
         if (((mutable_bitField0_ & 0x00000001) == 0x00000001)) {
           familyPath_ = java.util.Collections.unmodifiableList(familyPath_);
         }
+        if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
+          clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_);
+        }
         this.unknownFields = unknownFields.build();
         makeExtensionsImmutable();
       }
@@ -317,11 +348,42 @@ public final class SecureBulkLoadProtos {
       }
     }
 
+    // repeated string cluster_ids = 5;
+    public static final int CLUSTER_IDS_FIELD_NUMBER = 5;
+    private com.google.protobuf.LazyStringList clusterIds_;
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    public java.util.List<java.lang.String>
+        getClusterIdsList() {
+      return clusterIds_;
+    }
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    public int getClusterIdsCount() {
+      return clusterIds_.size();
+    }
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    public java.lang.String getClusterIds(int index) {
+      return clusterIds_.get(index);
+    }
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    public com.google.protobuf.ByteString
+        getClusterIdsBytes(int index) {
+      return clusterIds_.getByteString(index);
+    }
+
     private void initFields() {
       familyPath_ = java.util.Collections.emptyList();
       assignSeqNum_ = false;
       fsToken_ = org.apache.hadoop.hbase.protobuf.generated.SecureBulkLoadProtos.DelegationToken.getDefaultInstance();
       bulkToken_ = "";
+      clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -361,6 +423,9 @@ public final class SecureBulkLoadProtos {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(4, getBulkTokenBytes());
       }
+      for (int i = 0; i < clusterIds_.size(); i++) {
+        output.writeBytes(5, clusterIds_.getByteString(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -386,6 +451,15 @@ public final class SecureBulkLoadProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(4, getBulkTokenBytes());
       }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < clusterIds_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(clusterIds_.getByteString(i));
+        }
+        size += dataSize;
+        size += 1 * getClusterIdsList().size();
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -426,6 +500,8 @@ public final class SecureBulkLoadProtos {
         result = result && getBulkToken()
             .equals(other.getBulkToken());
       }
+      result = result && getClusterIdsList()
+          .equals(other.getClusterIdsList());
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -455,6 +531,10 @@ public final class SecureBulkLoadProtos {
         hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER;
         hash = (53 * hash) + getBulkToken().hashCode();
       }
+      if (getClusterIdsCount() > 0) {
+        hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
+        hash = (53 * hash) + getClusterIdsList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -582,6 +662,8 @@ public final class SecureBulkLoadProtos {
         bitField0_ = (bitField0_ & ~0x00000004);
         bulkToken_ = "";
         bitField0_ = (bitField0_ & ~0x00000008);
+        clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -635,6 +717,12 @@ public final class SecureBulkLoadProtos {
           to_bitField0_ |= 0x00000004;
         }
         result.bulkToken_ = bulkToken_;
+        if (((bitField0_ & 0x00000010) == 0x00000010)) {
+          clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(
+              clusterIds_);
+          bitField0_ = (bitField0_ & ~0x00000010);
+        }
+        result.clusterIds_ = clusterIds_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -688,6 +776,16 @@ public final class SecureBulkLoadProtos {
           bulkToken_ = other.bulkToken_;
           onChanged();
         }
+        if (!other.clusterIds_.isEmpty()) {
+          if (clusterIds_.isEmpty()) {
+            clusterIds_ = other.clusterIds_;
+            bitField0_ = (bitField0_ & ~0x00000010);
+          } else {
+            ensureClusterIdsIsMutable();
+            clusterIds_.addAll(other.clusterIds_);
+          }
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -1193,6 +1291,99 @@ public final class SecureBulkLoadProtos {
         return this;
       }
 
+      // repeated string cluster_ids = 5;
+      private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      private void ensureClusterIdsIsMutable() {
+        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+          clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_);
+          bitField0_ |= 0x00000010;
+         }
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public java.util.List<java.lang.String>
+          getClusterIdsList() {
+        return java.util.Collections.unmodifiableList(clusterIds_);
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public int getClusterIdsCount() {
+        return clusterIds_.size();
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public java.lang.String getClusterIds(int index) {
+        return clusterIds_.get(index);
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public com.google.protobuf.ByteString
+          getClusterIdsBytes(int index) {
+        return clusterIds_.getByteString(index);
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder setClusterIds(
+          int index, java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder addClusterIds(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder addAllClusterIds(
+          java.lang.Iterable<java.lang.String> values) {
+        ensureClusterIdsIsMutable();
+        super.addAll(values, clusterIds_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder clearClusterIds() {
+        clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder addClusterIdsBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.add(value);
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.SecureBulkLoadHFilesRequest)
     }
 
@@ -4858,30 +5049,30 @@ public final class SecureBulkLoadProtos {
   static {
     java.lang.String[] descriptorData = {
       "\n\024SecureBulkLoad.proto\022\010hbase.pb\032\013Table." +
-      "proto\032\013HBase.proto\032\014Client.proto\"\266\001\n\033Sec" +
+      "proto\032\013HBase.proto\032\014Client.proto\"\313\001\n\033Sec" +
       "ureBulkLoadHFilesRequest\022>\n\013family_path\030" +
       "\001 \003(\0132).hbase.pb.BulkLoadHFileRequest.Fa" +
       "milyPath\022\026\n\016assign_seq_num\030\002 \001(\010\022+\n\010fs_t" +
       "oken\030\003 \002(\0132\031.hbase.pb.DelegationToken\022\022\n" +
-      "\nbulk_token\030\004 \002(\t\".\n\034SecureBulkLoadHFile" +
-      "sResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017DelegationT" +
-      "oken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002 \001" +
-      "(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"A\n\026Pre",
-      "pareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(\0132" +
-      "\023.hbase.pb.TableName\"-\n\027PrepareBulkLoadR" +
-      "esponse\022\022\n\nbulk_token\030\001 \002(\t\",\n\026CleanupBu" +
-      "lkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\"\031\n\027Cle" +
-      "anupBulkLoadResponse2\256\002\n\025SecureBulkLoadS" +
-      "ervice\022V\n\017PrepareBulkLoad\022 .hbase.pb.Pre" +
-      "pareBulkLoadRequest\032!.hbase.pb.PrepareBu" +
-      "lkLoadResponse\022e\n\024SecureBulkLoadHFiles\022%" +
-      ".hbase.pb.SecureBulkLoadHFilesRequest\032&." +
-      "hbase.pb.SecureBulkLoadHFilesResponse\022V\n",
-      "\017CleanupBulkLoad\022 .hbase.pb.CleanupBulkL" +
-      "oadRequest\032!.hbase.pb.CleanupBulkLoadRes" +
-      "ponseBJ\n*org.apache.hadoop.hbase.protobu" +
-      "f.generatedB\024SecureBulkLoadProtosH\001\210\001\001\240\001" +
-      "\001"
+      "\nbulk_token\030\004 \002(\t\022\023\n\013cluster_ids\030\005 \003(\t\"." +
+      "\n\034SecureBulkLoadHFilesResponse\022\016\n\006loaded" +
+      "\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\nidentifier\030" +
+      "\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n",
+      "\007service\030\004 \001(\t\"A\n\026PrepareBulkLoadRequest" +
+      "\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.TableNam" +
+      "e\"-\n\027PrepareBulkLoadResponse\022\022\n\nbulk_tok" +
+      "en\030\001 \002(\t\",\n\026CleanupBulkLoadRequest\022\022\n\nbu" +
+      "lk_token\030\001 \002(\t\"\031\n\027CleanupBulkLoadRespons" +
+      "e2\256\002\n\025SecureBulkLoadService\022V\n\017PrepareBu" +
+      "lkLoad\022 .hbase.pb.PrepareBulkLoadRequest" +
+      "\032!.hbase.pb.PrepareBulkLoadResponse\022e\n\024S" +
+      "ecureBulkLoadHFiles\022%.hbase.pb.SecureBul" +
+      "kLoadHFilesRequest\032&.hbase.pb.SecureBulk",
+      "LoadHFilesResponse\022V\n\017CleanupBulkLoad\022 ." +
+      "hbase.pb.CleanupBulkLoadRequest\032!.hbase." +
+      "pb.CleanupBulkLoadResponseBJ\n*org.apache" +
+      ".hadoop.hbase.protobuf.generatedB\024Secure" +
+      "BulkLoadProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -4893,7 +5084,7 @@ public final class SecureBulkLoadProtos {
           internal_static_hbase_pb_SecureBulkLoadHFilesRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_SecureBulkLoadHFilesRequest_descriptor,
-              new java.lang.String[] { "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", });
+              new java.lang.String[] { "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "ClusterIds", });
           internal_static_hbase_pb_SecureBulkLoadHFilesResponse_descriptor =
             getDescriptor().getMessageTypes().get(1);
           internal_static_hbase_pb_SecureBulkLoadHFilesResponse_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
index e0efab4..89641bc 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/WALProtos.java
@@ -8854,6 +8854,26 @@ public final class WALProtos {
      * <code>required int64 bulkload_seq_num = 4;</code>
      */
     long getBulkloadSeqNum();
+
+    // repeated string cluster_ids = 5;
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    java.util.List<java.lang.String>
+    getClusterIdsList();
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    int getClusterIdsCount();
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    java.lang.String getClusterIds(int index);
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    com.google.protobuf.ByteString
+        getClusterIdsBytes(int index);
   }
   /**
    * Protobuf type {@code hbase.pb.BulkLoadDescriptor}
@@ -8942,6 +8962,14 @@ public final class WALProtos {
               bulkloadSeqNum_ = input.readInt64();
               break;
             }
+            case 42: {
+              if (!((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
+                clusterIds_ = new com.google.protobuf.LazyStringArrayList();
+                mutable_bitField0_ |= 0x00000010;
+              }
+              clusterIds_.add(input.readBytes());
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -8953,6 +8981,9 @@ public final class WALProtos {
         if (((mutable_bitField0_ & 0x00000004) == 0x00000004)) {
           stores_ = java.util.Collections.unmodifiableList(stores_);
         }
+        if (((mutable_bitField0_ & 0x00000010) == 0x00000010)) {
+          clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(clusterIds_);
+        }
         this.unknownFields = unknownFields.build();
         makeExtensionsImmutable();
       }
@@ -9075,11 +9106,42 @@ public final class WALProtos {
       return bulkloadSeqNum_;
     }
 
+    // repeated string cluster_ids = 5;
+    public static final int CLUSTER_IDS_FIELD_NUMBER = 5;
+    private com.google.protobuf.LazyStringList clusterIds_;
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    public java.util.List<java.lang.String>
+        getClusterIdsList() {
+      return clusterIds_;
+    }
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    public int getClusterIdsCount() {
+      return clusterIds_.size();
+    }
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    public java.lang.String getClusterIds(int index) {
+      return clusterIds_.get(index);
+    }
+    /**
+     * <code>repeated string cluster_ids = 5;</code>
+     */
+    public com.google.protobuf.ByteString
+        getClusterIdsBytes(int index) {
+      return clusterIds_.getByteString(index);
+    }
+
     private void initFields() {
       tableName_ = org.apache.hadoop.hbase.protobuf.generated.TableProtos.TableName.getDefaultInstance();
       encodedRegionName_ = com.google.protobuf.ByteString.EMPTY;
       stores_ = java.util.Collections.emptyList();
       bulkloadSeqNum_ = 0L;
+      clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -9127,6 +9189,9 @@ public final class WALProtos {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeInt64(4, bulkloadSeqNum_);
       }
+      for (int i = 0; i < clusterIds_.size(); i++) {
+        output.writeBytes(5, clusterIds_.getByteString(i));
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -9152,6 +9217,15 @@ public final class WALProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeInt64Size(4, bulkloadSeqNum_);
       }
+      {
+        int dataSize = 0;
+        for (int i = 0; i < clusterIds_.size(); i++) {
+          dataSize += com.google.protobuf.CodedOutputStream
+            .computeBytesSizeNoTag(clusterIds_.getByteString(i));
+        }
+        size += dataSize;
+        size += 1 * getClusterIdsList().size();
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -9192,6 +9266,8 @@ public final class WALProtos {
         result = result && (getBulkloadSeqNum()
             == other.getBulkloadSeqNum());
       }
+      result = result && getClusterIdsList()
+          .equals(other.getClusterIdsList());
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -9221,6 +9297,10 @@ public final class WALProtos {
         hash = (37 * hash) + BULKLOAD_SEQ_NUM_FIELD_NUMBER;
         hash = (53 * hash) + hashLong(getBulkloadSeqNum());
       }
+      if (getClusterIdsCount() > 0) {
+        hash = (37 * hash) + CLUSTER_IDS_FIELD_NUMBER;
+        hash = (53 * hash) + getClusterIdsList().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -9353,6 +9433,8 @@ public final class WALProtos {
         }
         bulkloadSeqNum_ = 0L;
         bitField0_ = (bitField0_ & ~0x00000008);
+        clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
         return this;
       }
 
@@ -9406,6 +9488,12 @@ public final class WALProtos {
           to_bitField0_ |= 0x00000004;
         }
         result.bulkloadSeqNum_ = bulkloadSeqNum_;
+        if (((bitField0_ & 0x00000010) == 0x00000010)) {
+          clusterIds_ = new com.google.protobuf.UnmodifiableLazyStringList(
+              clusterIds_);
+          bitField0_ = (bitField0_ & ~0x00000010);
+        }
+        result.clusterIds_ = clusterIds_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -9457,6 +9545,16 @@ public final class WALProtos {
         if (other.hasBulkloadSeqNum()) {
           setBulkloadSeqNum(other.getBulkloadSeqNum());
         }
+        if (!other.clusterIds_.isEmpty()) {
+          if (clusterIds_.isEmpty()) {
+            clusterIds_ = other.clusterIds_;
+            bitField0_ = (bitField0_ & ~0x00000010);
+          } else {
+            ensureClusterIdsIsMutable();
+            clusterIds_.addAll(other.clusterIds_);
+          }
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -9932,6 +10030,99 @@ public final class WALProtos {
         return this;
       }
 
+      // repeated string cluster_ids = 5;
+      private com.google.protobuf.LazyStringList clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+      private void ensureClusterIdsIsMutable() {
+        if (!((bitField0_ & 0x00000010) == 0x00000010)) {
+          clusterIds_ = new com.google.protobuf.LazyStringArrayList(clusterIds_);
+          bitField0_ |= 0x00000010;
+         }
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public java.util.List<java.lang.String>
+          getClusterIdsList() {
+        return java.util.Collections.unmodifiableList(clusterIds_);
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public int getClusterIdsCount() {
+        return clusterIds_.size();
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public java.lang.String getClusterIds(int index) {
+        return clusterIds_.get(index);
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public com.google.protobuf.ByteString
+          getClusterIdsBytes(int index) {
+        return clusterIds_.getByteString(index);
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder setClusterIds(
+          int index, java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.set(index, value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder addClusterIds(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.add(value);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder addAllClusterIds(
+          java.lang.Iterable<java.lang.String> values) {
+        ensureClusterIdsIsMutable();
+        super.addAll(values, clusterIds_);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder clearClusterIds() {
+        clusterIds_ = com.google.protobuf.LazyStringArrayList.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000010);
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>repeated string cluster_ids = 5;</code>
+       */
+      public Builder addClusterIdsBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  ensureClusterIdsIsMutable();
+        clusterIds_.add(value);
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadDescriptor)
     }
 
@@ -12002,22 +12193,23 @@ public final class WALProtos {
       "ANNOT_FLUSH\020\003\"q\n\017StoreDescriptor\022\023\n\013fami" +
       "ly_name\030\001 \002(\014\022\026\n\016store_home_dir\030\002 \002(\t\022\022\n" +
       "\nstore_file\030\003 \003(\t\022\035\n\025store_file_size_byt" +
-      "es\030\004 \001(\004\"\237\001\n\022BulkLoadDescriptor\022\'\n\ntable" +
+      "es\030\004 \001(\004\"\264\001\n\022BulkLoadDescriptor\022\'\n\ntable" +
       "_name\030\001 \002(\0132\023.hbase.pb.TableName\022\033\n\023enco" +
       "ded_region_name\030\002 \002(\014\022)\n\006stores\030\003 \003(\0132\031." +
       "hbase.pb.StoreDescriptor\022\030\n\020bulkload_seq" +
-      "_num\030\004 \002(\003\"\272\002\n\025RegionEventDescriptor\022=\n\n" +
-      "event_type\030\001 \002(\0162).hbase.pb.RegionEventD" +
-      "escriptor.EventType\022\022\n\ntable_name\030\002 \002(\014\022",
-      "\033\n\023encoded_region_name\030\003 \002(\014\022\033\n\023log_sequ" +
-      "ence_number\030\004 \001(\004\022)\n\006stores\030\005 \003(\0132\031.hbas" +
-      "e.pb.StoreDescriptor\022$\n\006server\030\006 \001(\0132\024.h" +
-      "base.pb.ServerName\022\023\n\013region_name\030\007 \001(\014\"" +
-      ".\n\tEventType\022\017\n\013REGION_OPEN\020\000\022\020\n\014REGION_" +
-      "CLOSE\020\001\"\014\n\nWALTrailer*F\n\tScopeType\022\033\n\027RE" +
-      "PLICATION_SCOPE_LOCAL\020\000\022\034\n\030REPLICATION_S" +
-      "COPE_GLOBAL\020\001B?\n*org.apache.hadoop.hbase" +
-      ".protobuf.generatedB\tWALProtosH\001\210\001\000\240\001\001"
+      "_num\030\004 \002(\003\022\023\n\013cluster_ids\030\005 \003(\t\"\272\002\n\025Regi" +
+      "onEventDescriptor\022=\n\nevent_type\030\001 \002(\0162)." +
+      "hbase.pb.RegionEventDescriptor.EventType",
+      "\022\022\n\ntable_name\030\002 \002(\014\022\033\n\023encoded_region_n" +
+      "ame\030\003 \002(\014\022\033\n\023log_sequence_number\030\004 \001(\004\022)" +
+      "\n\006stores\030\005 \003(\0132\031.hbase.pb.StoreDescripto" +
+      "r\022$\n\006server\030\006 \001(\0132\024.hbase.pb.ServerName\022" +
+      "\023\n\013region_name\030\007 \001(\014\".\n\tEventType\022\017\n\013REG" +
+      "ION_OPEN\020\000\022\020\n\014REGION_CLOSE\020\001\"\014\n\nWALTrail" +
+      "er*F\n\tScopeType\022\033\n\027REPLICATION_SCOPE_LOC" +
+      "AL\020\000\022\034\n\030REPLICATION_SCOPE_GLOBAL\020\001B?\n*or" +
+      "g.apache.hadoop.hbase.protobuf.generated" +
+      "B\tWALProtosH\001\210\001\000\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -12071,7 +12263,7 @@ public final class WALProtos {
           internal_static_hbase_pb_BulkLoadDescriptor_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_BulkLoadDescriptor_descriptor,
-              new java.lang.String[] { "TableName", "EncodedRegionName", "Stores", "BulkloadSeqNum", });
+              new java.lang.String[] { "TableName", "EncodedRegionName", "Stores", "BulkloadSeqNum", "ClusterIds", });
           internal_static_hbase_pb_RegionEventDescriptor_descriptor =
             getDescriptor().getMessageTypes().get(7);
           internal_static_hbase_pb_RegionEventDescriptor_fieldAccessorTable = new
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index 83f4e25..5740669 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -373,6 +373,7 @@ message BulkLoadHFileRequest {
   required RegionSpecifier region = 1;
   repeated FamilyPath family_path = 2;
   optional bool assign_seq_num = 3;
+  repeated string cluster_ids = 4;
 
   message FamilyPath {
     required bytes family = 1;
diff --git a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
index 12e7cf7..09b911f 100644
--- a/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
+++ b/hbase-protocol/src/main/protobuf/SecureBulkLoad.proto
@@ -32,6 +32,7 @@ message SecureBulkLoadHFilesRequest {
   optional bool assign_seq_num = 2;
   required DelegationToken fs_token = 3;
   required string bulk_token = 4;
+  repeated string cluster_ids = 5;
 }
 
 message SecureBulkLoadHFilesResponse {
diff --git a/hbase-protocol/src/main/protobuf/WAL.proto b/hbase-protocol/src/main/protobuf/WAL.proto
index a888686..de466c6 100644
--- a/hbase-protocol/src/main/protobuf/WAL.proto
+++ b/hbase-protocol/src/main/protobuf/WAL.proto
@@ -144,6 +144,7 @@ message BulkLoadDescriptor {
   required bytes encoded_region_name = 2;
   repeated StoreDescriptor stores = 3;
   required int64 bulkload_seq_num = 4;
+  repeated string cluster_ids = 5;
 }
 
 /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 9d7d80b..7a8b960 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -139,6 +139,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private int nrThreads;
   private int depth = 2;
 
+  private List<String> clusterIds = new ArrayList<>();
+
   private LoadIncrementalHFiles() {}
 
   public LoadIncrementalHFiles(Configuration conf) throws Exception {
@@ -146,6 +148,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     initialize();
   }
 
+  public void setClusterIds(List<String> clusterIds) {
+    this.clusterIds = clusterIds;
+  }
+
   public void setDepth(int depth) {
     this.depth = depth;
   }
@@ -873,7 +879,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
               + Bytes.toStringBinary(getRow()) + " with hfile group " + famPaths);
           byte[] regionName = getLocation().getRegionInfo().getRegionName();
           if (!isSecureBulkLoadEndpointAvailable()) {
-            success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds);
+            success = ProtobufUtil.bulkLoadHFile(getStub(), famPaths, regionName, assignSeqIds,
+              clusterIds);
           } else {
             try (Table table = conn.getTable(getTableName())) {
               secureClient = new SecureBulkLoadClient(table);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ec1c609..8ffca7d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5834,7 +5834,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
   @Override
   public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
-      BulkLoadListener bulkLoadListener) throws IOException {
+      BulkLoadListener bulkLoadListener, List<String> clusterIds) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
     Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
@@ -6010,7 +6010,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           WALProtos.BulkLoadDescriptor loadDescriptor =
               ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
                 ByteStringer.wrap(this.getRegionInfo().getEncodedNameAsBytes()), storeFiles,
-                storeFilesSizes, seqId);
+                storeFilesSizes, seqId, clusterIds);
           WALUtil.writeBulkLoadMarkerAndSync(wal, this.htableDescriptor, getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index af0c680..f861b9e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -450,7 +450,7 @@ public class HRegionServer extends HasThread implements
   /**
    * Unique identifier for the cluster we are a part of.
    */
-  private String clusterId;
+  String clusterId;
 
   /**
    * MX Bean for RegionServerInfo
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 68e01ce..17a402b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2152,6 +2152,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   public BulkLoadHFileResponse bulkLoadHFile(final RpcController controller,
       final BulkLoadHFileRequest request) throws ServiceException {
     long start = EnvironmentEdgeManager.currentTime();
+    List<String> clusterIds = new ArrayList<String>(request.getClusterIdsList());
+    if(clusterIds.contains(this.regionServer.clusterId)){
+      return BulkLoadHFileResponse.newBuilder().setLoaded(true).build();
+    } else {
+      clusterIds.add(this.regionServer.clusterId);
+    }
     try {
       checkOpen();
       requestCount.increment();
@@ -2168,7 +2174,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       boolean loaded = false;
       try {
         if (!bypass) {
-          loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
+          loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null, clusterIds);
         }
       } finally {
         if (region.getCoprocessorHost() != null) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index 6642220..12ec899 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -552,14 +552,15 @@ public interface Region extends ConfigurationObserver {
    * rows with multiple column families atomically.
    *
    * @param familyPaths List of Pair&lt;byte[] column family, String hfilePath&gt;
+   * @param assignSeqId
    * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
    * file about to be bulk loaded
-   * @param assignSeqId
+   * @param clusterIds
    * @return true if successful, false if failed recoverably
    * @throws IOException if failed unrecoverably.
    */
   boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
-      BulkLoadListener bulkLoadListener) throws IOException;
+    BulkLoadListener bulkLoadListener, List<String> clusterIds) throws IOException;
 
   ///////////////////////////////////////////////////////////////////////////
   // Coprocessors
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index fc9f3d1..1a26568 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -86,17 +86,19 @@ public class HFileReplicator {
   private ThreadPoolExecutor exec;
   private int maxCopyThreads;
   private int copiesPerThread;
+  private List<String> sourceClusterIds;
 
   public HFileReplicator(Configuration sourceClusterConf,
       String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
       Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
-      Connection connection) throws IOException {
+      Connection connection, List<String> sourceClusterIds) throws IOException {
     this.sourceClusterConf = sourceClusterConf;
     this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
     this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
     this.bulkLoadHFileMap = tableQueueMap;
     this.conf = conf;
     this.connection = connection;
+    this.sourceClusterIds = sourceClusterIds;
 
     userProvider = UserProvider.instantiate(conf);
     fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
@@ -130,6 +132,7 @@ public class HFileReplicator {
       LoadIncrementalHFiles loadHFiles = null;
       try {
         loadHFiles = new LoadIncrementalHFiles(conf);
+        loadHFiles.setClusterIds(sourceClusterIds);
       } catch (Exception e) {
         LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
             + " data.", e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 30d25c1..0eb4860 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -157,9 +157,7 @@ public class ReplicationSink {
       Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
           new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
 
-      // Map of table name Vs list of pair of family and list of hfile paths from its namespace
-      Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
-
+      Map<List<String>, Map<String, List<Pair<byte[], List<String>>>>> bulkLoadsPerClusters = null;
       for (WALEntry entry : entries) {
         TableName table =
             TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -174,10 +172,19 @@ public class ReplicationSink {
           Cell cell = cells.current();
           // Handle bulk load hfiles replication
           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+            if(bulkLoadsPerClusters == null) {
+              bulkLoadsPerClusters = new HashMap<>();
+            }
+            // Map of table name Vs list of pair of family and list of
+            // hfile paths from its namespace
+            Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+              bulkLoadsPerClusters.get(bld.getClusterIdsList());
             if (bulkLoadHFileMap == null) {
-              bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
+              bulkLoadHFileMap = new HashMap<>();
+              bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
             }
-            buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
+            buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
           } else {
             // Handle wal replication
             if (isNewRowOrType(previousCell, cell)) {
@@ -213,14 +220,26 @@ public class ReplicationSink {
         LOG.debug("Finished replicating mutations.");
       }
 
-      if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
-        LOG.debug("Started replicating bulk loaded data.");
-        HFileReplicator hFileReplicator =
-            new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
+      if(bulkLoadsPerClusters != null) {
+        for (Entry<List<String>, Map<String, List<Pair<byte[],
+          List<String>>>>> entry : bulkLoadsPerClusters.entrySet()) {
+          Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = entry.getValue();
+          if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Started replicating bulk loaded data from cluster ids: " +
+                entry.getKey().toString());
+            }
+            HFileReplicator hFileReplicator =
+              new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
                 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
-                getConnection());
-        hFileReplicator.replicate();
-        LOG.debug("Finished replicating bulk loaded data.");
+                getConnection(), entry.getKey());
+            hFileReplicator.replicate();
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Finished replicating bulk loaded data from cluster id: " +
+                entry.getKey().toString());
+            }
+          }
+        }
       }
 
       int size = entries.size();
@@ -235,8 +254,7 @@ public class ReplicationSink {
 
   private void buildBulkLoadHFileMap(
       final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
-      Cell cell) throws IOException {
-    BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+      BulkLoadDescriptor bld) throws IOException {
     List<StoreDescriptor> storesList = bld.getStoresList();
     int storesSize = storesList.size();
     for (int j = 0; j < storesSize; j++) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index f400fd4..6493ab6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -306,7 +306,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
 
   @Override
   public void secureBulkLoadHFiles(RpcController controller,
-                                   SecureBulkLoadHFilesRequest request,
+                                   final SecureBulkLoadHFilesRequest request,
                                    RpcCallback<SecureBulkLoadHFilesResponse> done) {
     final List<Pair<byte[], String>> familyPaths = new ArrayList<Pair<byte[], String>>();
     for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
@@ -399,7 +399,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
             //We call bulkLoadHFiles as requesting user
             //To enable access prior to staging
             return env.getRegion().bulkLoadHFiles(familyPaths, true,
-                new SecureBulkLoadListener(fs, bulkToken, conf));
+                new SecureBulkLoadListener(fs, bulkToken, conf), request.getClusterIdsList());
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
index 5649d8a..d71d081 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoad.java
@@ -122,14 +122,14 @@ public class TestBulkLoad {
       };
     });
     testRegionWithFamiliesAndSpecifiedTableName(tableName, family1)
-        .bulkLoadHFiles(familyPaths, false, null);
+        .bulkLoadHFiles(familyPaths, false, null, null);
     verify(log).sync(anyLong());
   }
 
   @Test
   public void bulkHLogShouldThrowNoErrorAndWriteMarkerWithBlankInput() throws IOException {
     testRegionWithFamilies(family1).bulkLoadHFiles(new ArrayList<Pair<byte[], String>>(),
-      false, null);
+      false, null, null);
   }
 
   @Test
@@ -147,7 +147,7 @@ public class TestBulkLoad {
         return 01L;
       };
     });
-    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
+    testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1), false, null, null);
     verify(log).sync(anyLong());
   }
 
@@ -167,7 +167,7 @@ public class TestBulkLoad {
               };
             });
     testRegionWithFamilies(family1, family2).bulkLoadHFiles(withFamilyPathsFor(family1, family2),
-            false, null);
+            false, null, null);
     verify(log).sync(anyLong());
   }
 
@@ -188,33 +188,33 @@ public class TestBulkLoad {
     });
     TableName tableName = TableName.valueOf("test", "test");
     testRegionWithFamiliesAndSpecifiedTableName(tableName, family1, family2)
-        .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null);
+        .bulkLoadHFiles(withFamilyPathsFor(family1, family2), false, null, null);
     verify(log).sync(anyLong());
   }
 
   @Test(expected = DoNotRetryIOException.class)
   public void shouldCrashIfBulkLoadFamiliesNotInTable() throws IOException {
     testRegionWithFamilies(family1).bulkLoadHFiles(withFamilyPathsFor(family1, family2), false,
-      null);
+      null, null);
   }
 
   @Test(expected = DoNotRetryIOException.class)
   public void bulkHLogShouldThrowErrorWhenFamilySpecifiedAndHFileExistsButNotInTableDescriptor()
       throws IOException {
-    testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null);
+    testRegionWithFamilies().bulkLoadHFiles(withFamilyPathsFor(family1), false, null, null);
   }
 
   @Test(expected = DoNotRetryIOException.class)
   public void shouldThrowErrorIfBadFamilySpecifiedAsFamilyPath() throws IOException {
     testRegionWithFamilies()
         .bulkLoadHFiles(asList(withInvalidColumnFamilyButProperHFileLocation(family1)),
-            false, null);
+            false, null, null);
   }
 
   @Test(expected = FileNotFoundException.class)
   public void shouldThrowErrorIfHFileDoesNotExist() throws IOException {
     List<Pair<byte[], String>> list = asList(withMissingHFileForFamily(family1));
-    testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null);
+    testRegionWithFamilies(family1).bulkLoadHFiles(list, false, null, null);
   }
 
   private Pair<byte[], String> withMissingHFileForFamily(byte[] family) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
new file mode 100644
index 0000000..d8d4930
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CLUSTER_ID;
+import static org.apache.hadoop.hbase.HConstants.REPLICATION_CONF_DIR;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for bulk load replication. Defines three clusters, with the following
+ * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
+ * 2 and 3).
+ *
+ * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
+ * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
+ * topology all these bulk loads should get replicated only once on each peer. To assert this,
+ * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
+ * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
+ * we are not entering the infinite loop condition addressed by HBASE-22380.
+ */
+@Category({ ReplicationTests.class, MediumTests.class})
+public class TestBulkLoadReplication extends TestReplicationBase {
+
+  protected static final Logger LOG =
+    LoggerFactory.getLogger(TestBulkLoadReplication.class);
+
+  private static final String PEER1_CLUSTER_ID = "peer1";
+  private static final String PEER4_CLUSTER_ID = "peer4";
+  private static final String PEER3_CLUSTER_ID = "peer3";
+
+  private static final String PEER_ID1 = "1";
+  private static final String PEER_ID3 = "3";
+  private static final String PEER_ID4 = "4";
+
+  private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+  private static CountDownLatch BULK_LOAD_LATCH;
+
+  private static HBaseTestingUtility utility3;
+  private static HBaseTestingUtility utility4;
+  private static Configuration conf3;
+  private static Configuration conf4;
+//  private static Table htable3;
+//  private static Table htable4;
+
+  @Rule
+  public TestName name = new TestName();
+
+  @ClassRule
+  public static TemporaryFolder testFolder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    setupBulkLoadConfigsForCluster(conf1, PEER1_CLUSTER_ID);
+    conf3 = HBaseConfiguration.create(conf1);
+    setupBulkLoadConfigsForCluster(conf3, PEER3_CLUSTER_ID);
+    conf3.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/3");
+    utility3 = new HBaseTestingUtility(conf3);
+    conf4 = HBaseConfiguration.create(conf1);
+    setupBulkLoadConfigsForCluster(conf4, PEER4_CLUSTER_ID);
+    conf4.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/4");
+    utility4 = new HBaseTestingUtility(conf4);
+    TestReplicationBase.setUpBeforeClass();
+    startCluster(utility3, conf3);
+    startCluster(utility4, conf4);
+  }
+
+  private static void startCluster(HBaseTestingUtility util, Configuration configuration)
+      throws Exception {
+    LOG.info("Setup Zk to same one from utility1 and utility4");
+    util.setZkCluster(utility1.getZkCluster());
+    util.startMiniCluster(2);
+
+    HTableDescriptor tableDesc = new HTableDescriptor(tableName);
+    HColumnDescriptor columnDesc = new HColumnDescriptor(famName);
+    columnDesc.setScope(1);
+    tableDesc.addFamily(columnDesc);
+
+    Connection connection = ConnectionFactory.createConnection(configuration);
+    try (Admin admin = connection.getAdmin()) {
+      admin.createTable(tableDesc, HBaseTestingUtility.KEYS_FOR_HBA_CREATE_TABLE);
+    }
+    util.waitUntilAllRegionsAssigned(tableName);
+  }
+
+  @Before
+  public void setUpBase() throws Exception {
+    ReplicationPeerConfig peer1Config = getPeerConfigForCluster(utility1);
+    ReplicationPeerConfig peer4Config = getPeerConfigForCluster(utility4);
+    ReplicationPeerConfig peer3Config = getPeerConfigForCluster(utility3);
+    //adds cluster4 as a remote peer on cluster1
+    getReplicationAdmin(utility1.getConfiguration()).addPeer(PEER_ID4, peer4Config);
+    //adds cluster1 as a remote peer on cluster4
+    ReplicationAdmin admin4 = getReplicationAdmin(utility4.getConfiguration());
+    admin4.addPeer(PEER_ID1, peer1Config);
+    //adds cluster3 as a remote peer on cluster4
+    admin4.addPeer(PEER_ID3, peer3Config);
+    //adds cluster4 as a remote peer on cluster3
+    getReplicationAdmin(utility3.getConfiguration()).addPeer(PEER_ID4, peer4Config);
+    setupCoprocessor(utility1);
+    setupCoprocessor(utility4);
+    setupCoprocessor(utility3);
+  }
+
+  private ReplicationAdmin getReplicationAdmin(Configuration configuration) throws IOException {
+    return new ReplicationAdmin(configuration);
+  }
+
+  private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
+    ReplicationPeerConfig config =  new ReplicationPeerConfig();
+    config.setClusterKey(util.getClusterKey());
+    return config;
+  }
+
+  private void setupCoprocessor(HBaseTestingUtility cluster) throws IOException {
+    for(HRegion region : cluster.getHBaseCluster().getRegions(tableName)){
+      region.getCoprocessorHost().load(TestBulkLoadReplication.BulkReplicationTestObserver.class,
+        0, cluster.getConfiguration());
+    }
+  }
+
+  @After
+  public void tearDownBase() throws Exception {
+    getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID1);
+    getReplicationAdmin(utility4.getConfiguration()).removePeer(PEER_ID3);
+    getReplicationAdmin(utility3.getConfiguration()).removePeer(PEER_ID4);
+  }
+
+  private static void setupBulkLoadConfigsForCluster(Configuration config,
+    String clusterReplicationId) throws Exception {
+    config.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    config.set(REPLICATION_CLUSTER_ID, clusterReplicationId);
+    File sourceConfigFolder = testFolder.newFolder(clusterReplicationId);
+    File sourceConfigFile = new File(sourceConfigFolder.getAbsolutePath()
+      + "/hbase-site.xml");
+    config.writeXml(new FileOutputStream(sourceConfigFile));
+    config.set(REPLICATION_CONF_DIR, testFolder.getRoot().getAbsolutePath());
+  }
+
+  @Test
+  public void testBulkLoadReplicationActiveActive() throws Exception {
+    Table peer1TestTable = utility1.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer4TestTable = utility4.getConnection().getTable(TestReplicationBase.tableName);
+    Table peer3TestTable = utility3.getConnection().getTable(TestReplicationBase.tableName);
+    byte[] row = Bytes.toBytes("001");
+    byte[] value = Bytes.toBytes("v1");
+    assertBulkLoadConditions(row, value, utility1, peer1TestTable, peer4TestTable, peer3TestTable);
+    row = Bytes.toBytes("002");
+    value = Bytes.toBytes("v2");
+    assertBulkLoadConditions(row, value, utility4, peer4TestTable, peer1TestTable, peer3TestTable);
+    row = Bytes.toBytes("003");
+    value = Bytes.toBytes("v3");
+    assertBulkLoadConditions(row, value, utility3, peer3TestTable, peer4TestTable, peer1TestTable);
+    //Additional wait to make sure no extra bulk load happens
+    Thread.sleep(400);
+    //We have 3 bulk load events (1 initiated on each cluster).
+    //Each event gets 3 counts (the originator cluster, plus the two peers),
+    //so BULK_LOADS_COUNT expected value is 3 * 3 = 9.
+    assertEquals(9, BULK_LOADS_COUNT.get());
+  }
+
+  private void assertBulkLoadConditions(byte[] row, byte[] value,
+      HBaseTestingUtility utility, Table...tables) throws Exception {
+    BULK_LOAD_LATCH = new CountDownLatch(3);
+    bulkLoadOnCluster(row, value, utility);
+    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.MINUTES));
+    assertTableHasValue(tables[0], row, value);
+    assertTableHasValue(tables[1], row, value);
+    assertTableHasValue(tables[2], row, value);
+  }
+
+  private void bulkLoadOnCluster(byte[] row, byte[] value,
+      HBaseTestingUtility cluster) throws Exception {
+    String bulkLoadFile = createHFileForFamilies(row, value, cluster.getConfiguration());
+    copyToHdfs(bulkLoadFile, cluster.getDFSCluster());
+    LoadIncrementalHFiles bulkLoadHFilesTool =
+      new LoadIncrementalHFiles(cluster.getConfiguration());
+    bulkLoadHFilesTool.run(new String[]{"/bulk_dir/region1/", tableName.getNameAsString()});
+  }
+
+  private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
+    Path bulkLoadDir = new Path("/bulk_dir/region1/f");
+    cluster.getFileSystem().mkdirs(bulkLoadDir);
+    cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
+  }
+
+  private void assertTableHasValue(Table table, byte[] row, byte[] value) throws Exception {
+    Get get = new Get(row);
+    Result result = table.get(get);
+    assertTrue(result.advance());
+    assertEquals(Bytes.toString(value), Bytes.toString(result.value()));
+  }
+
+  private String createHFileForFamilies(byte[] row, byte[] value,
+      Configuration clusterConfig) throws IOException {
+    final KeyValue kv = new KeyValue(row, famName, Bytes.toBytes("1"), System.currentTimeMillis(),
+      KeyValue.Type.Put, value);
+    final HFile.WriterFactory hFileFactory = HFile.getWriterFactoryNoCache(clusterConfig);
+    // TODO We need a way to do this without creating files
+    final File hFileLocation = testFolder.newFile();
+    final FSDataOutputStream out =
+      new FSDataOutputStream(new FileOutputStream(hFileLocation), null);
+    try {
+      hFileFactory.withOutputStream(out);
+      hFileFactory.withFileContext(new HFileContext());
+      HFile.Writer writer = hFileFactory.create();
+      try {
+        writer.append(kv);
+      } finally {
+        writer.close();
+      }
+    } finally {
+      out.close();
+    }
+    return hFileLocation.getAbsoluteFile().getAbsolutePath();
+  }
+
+  public static class BulkReplicationTestObserver extends BaseRegionObserver {
+
+    @Override
+    public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+      List<Pair<byte[], String>> familyPaths) throws IOException {
+        BULK_LOADS_COUNT.incrementAndGet();
+    }
+
+    @Override
+    public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
+        List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
+      if(hasLoaded) {
+        BULK_LOAD_LATCH.countDown();
+      }
+      return true;
+    }
+
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
index 3a8b557..481e5c9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java
@@ -1480,7 +1480,7 @@ public class TestHRegionReplayEvents {
         randomValues)));
       expectedLoadFileCount++;
     }
-    primaryRegion.bulkLoadHFiles(familyPaths, false, null);
+    primaryRegion.bulkLoadHFiles(familyPaths, false, null, null);
 
     // now replay the edits and the bulk load marker
     reader = createWALReaderForPrimary();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
index 2ea2629..8f374e3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
@@ -364,7 +364,7 @@ public class TestWALReplay {
         Bytes.toBytes("z"), 10);
     List <Pair<byte[],String>>  hfs= new ArrayList<Pair<byte[],String>>(1);
     hfs.add(Pair.newPair(family, f.toString()));
-    region.bulkLoadHFiles(hfs, true, null);
+    region.bulkLoadHFiles(hfs, true, null, null);
 
     // Add an edit so something in the WAL
     byte [] row = tableName.getName();
@@ -438,7 +438,7 @@ public class TestWALReplay {
           Bytes.toBytes(i + "50"), 10);
       hfs.add(Pair.newPair(family, f.toString()));
     }
-    region.bulkLoadHFiles(hfs, true, null);
+    region.bulkLoadHFiles(hfs, true, null, null);
     final int rowsInsertedCount = 31;
     assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));