You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/10/29 11:16:09 UTC

[hbase] branch branch-2 updated: HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new d2a027d  HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)
d2a027d is described below

commit d2a027d8be0723571759e18688bcc1f5c7865a8e
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Mon Oct 28 20:17:05 2019 +0000

    HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)
    
    Signed-off-by: Josh Elser <el...@apache.org>
    (cherry picked from commit 4d414020bb3bfd7f214d2a599426be700df772b2, then resolved conflicts)
---
 .../hadoop/hbase/client/SecureBulkLoadClient.java  |   8 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  11 +-
 .../hbase/shaded/protobuf/RequestConverter.java    |   5 +-
 .../src/main/protobuf/Client.proto                 |   1 +
 hbase-protocol-shaded/src/main/protobuf/WAL.proto  |   1 +
 .../mob/compactions/PartitionedMobCompactor.java   |   1 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   7 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   2 +-
 .../hbase/regionserver/SecureBulkLoadManager.java  |   2 +-
 .../replication/regionserver/ReplicationSink.java  |  24 ++--
 .../apache/hadoop/hbase/tool/BulkLoadHFiles.java   |   1 +
 .../hadoop/hbase/tool/LoadIncrementalHFiles.java   |  11 +-
 .../regionserver/TestBulkLoadReplication.java      | 136 +++++++++++++++++----
 13 files changed, 159 insertions(+), 51 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7e3166c..cb258e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -116,7 +116,7 @@ public class SecureBulkLoadClient {
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) throws IOException {
     return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
-      bulkToken, false, null);
+      bulkToken, false, null, true);
   }
 
   /**
@@ -138,17 +138,17 @@ public class SecureBulkLoadClient {
     final Token<?> userToken, final String bulkToken,
     boolean copyFiles) throws IOException {
     return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
-      bulkToken, false, null);
+      bulkToken, false, null, true);
   }
 
   public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
       final List<Pair<byte[], String>> familyPaths,
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken,
-      boolean copyFiles, List<String> clusterIds) throws IOException {
+      boolean copyFiles, List<String> clusterIds, boolean replicate) throws IOException {
     BulkLoadHFileRequest request =
         RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
-          userToken, bulkToken, copyFiles, clusterIds);
+          userToken, bulkToken, copyFiles, clusterIds, replicate);
 
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 0a2063d..5223551 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2582,16 +2582,19 @@ public final class ProtobufUtil {
     ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
     Map<String, Long> storeFilesSize, long bulkloadSeqId) {
     return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
-      storeFilesSize, bulkloadSeqId, null);
+      storeFilesSize, bulkloadSeqId, null, true);
   }
 
   public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
       ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
-      Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
+      Map<String, Long> storeFilesSize, long bulkloadSeqId,
+      List<String> clusterIds, boolean replicate) {
     BulkLoadDescriptor.Builder desc =
         BulkLoadDescriptor.newBuilder()
-        .setTableName(ProtobufUtil.toProtoTableName(tableName))
-        .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+          .setTableName(ProtobufUtil.toProtoTableName(tableName))
+          .setEncodedRegionName(encodedRegionName)
+          .setBulkloadSeqNum(bulkloadSeqId)
+          .setReplicate(replicate);
     if(clusterIds != null) {
       desc.addAllClusterIds(clusterIds);
     }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 7dc8645..151a454 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -566,7 +566,7 @@ public final class RequestConverter {
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) {
     return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
-        false, null);
+        false, null, true);
   }
 
   /**
@@ -583,7 +583,7 @@ public final class RequestConverter {
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
       final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
         final Token<?> userToken, final String bulkToken, boolean copyFiles,
-          List<String> clusterIds) {
+          List<String> clusterIds, boolean replicate) {
     RegionSpecifier region = RequestConverter.buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
 
@@ -624,6 +624,7 @@ public final class RequestConverter {
     if (clusterIds != null) {
       request.addAllClusterIds(clusterIds);
     }
+    request.setReplicate(replicate);
     return request.build();
   }
 
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 07d8d71..a22c623 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
   optional string bulk_token = 5;
   optional bool copy_file = 6 [default = false];
   repeated string cluster_ids = 7;
+  optional bool replicate = 8 [default = true];
 
   message FamilyPath {
     required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index c103075..fd622cf 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -151,6 +151,7 @@ message BulkLoadDescriptor {
   repeated StoreDescriptor stores = 3;
   required int64 bulkload_seq_num = 4;
   repeated string cluster_ids = 5;
+  optional bool replicate = 6 [default = true];
 }
 
 /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 09d9d98..ed29902 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -848,6 +848,7 @@ public class PartitionedMobCompactor extends MobCompactor {
     // bulkload the ref file
     try {
       LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+      bulkload.disableReplication();
       bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
           connection.getRegionLocator(table.getName()));
     } catch (Exception e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 5b894a5..2423c08 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6099,7 +6099,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
       BulkLoadListener bulkLoadListener) throws IOException {
-    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
+    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
+      null, true);
   }
 
   /**
@@ -6150,7 +6151,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
       boolean assignSeqId, BulkLoadListener bulkLoadListener,
-        boolean copyFile, List<String> clusterIds) throws IOException {
+        boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6325,7 +6326,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           WALProtos.BulkLoadDescriptor loadDescriptor =
               ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
                   UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles, storeFilesSizes, seqId, clusterIds);
+                  storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 00e6169..32d75f4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2402,7 +2402,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
         try {
           map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
-              request.getCopyFile(), clusterIds);
+              request.getCopyFile(), clusterIds, request.getReplicate());
         } finally {
           if (region.getCoprocessorHost() != null) {
             region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 0badf2a..d0f3943 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -295,7 +295,7 @@ public class SecureBulkLoadManager {
             //To enable access prior to staging
             return region.bulkLoadHFiles(familyPaths, true,
                 new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
-              clusterIds);
+              clusterIds, request.getReplicate());
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 6e68760..ae0a732 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -203,18 +203,20 @@ public class ReplicationSink {
           // Handle bulk load hfiles replication
           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-            if(bulkLoadsPerClusters == null) {
-              bulkLoadsPerClusters = new HashMap<>();
-            }
-            // Map of table name Vs list of pair of family and list of
-            // hfile paths from its namespace
-            Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
-              bulkLoadsPerClusters.get(bld.getClusterIdsList());
-            if (bulkLoadHFileMap == null) {
-              bulkLoadHFileMap = new HashMap<>();
-              bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+            if(bld.getReplicate()) {
+              if (bulkLoadsPerClusters == null) {
+                bulkLoadsPerClusters = new HashMap<>();
+              }
+              // Map of table name Vs list of pair of family and list of
+              // hfile paths from its namespace
+              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+                bulkLoadsPerClusters.get(bld.getClusterIdsList());
+              if (bulkLoadHFileMap == null) {
+                bulkLoadHFileMap = new HashMap<>();
+                bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+              }
+              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
             }
-            buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
           } else {
             // Handle wal replication
             if (isNewRowOrType(previousCell, cell)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
index f3d627a..702ed75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -97,4 +97,5 @@ public interface BulkLoadHFiles {
   static BulkLoadHFiles create(Configuration conf) {
     return new BulkLoadHFilesTool(conf);
   }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index c5ba5a7..16ed114 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -157,6 +157,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   private List<String> clusterIds = new ArrayList<>();
 
+  private boolean replicate = true;
+
   /**
    * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
    * the case where a region has split during the process of the load. When this happens, the HFile
@@ -547,7 +549,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           try (Table table = conn.getTable(getTableName())) {
             secureClient = new SecureBulkLoadClient(getConf(), table);
             success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-              assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds);
+              assignSeqIds, fsDelegationToken.getUserToken(),
+                bulkToken, copyFile, clusterIds, replicate);
           }
           return success ? regionName : null;
         } finally {
@@ -1267,6 +1270,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   }
 
   /**
+   * Disables replication for these bulkloaded files.
+   */
+  public void disableReplication(){
+    this.replicate = false;
+  }
+  /**
    * Infers region boundaries for a new table.
    * <p>
    * Parameter: <br>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 028277f..6fd7288 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -24,10 +24,16 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +48,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -54,14 +61,21 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
@@ -77,14 +91,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Integration test for bulk load replication. Defines two clusters, with two way replication.
- * Performs a bulk load on cluster defined by UTIL1 first, asserts the Cell on the bulk loaded file
- * gets into the related table in UTIL1, then also validates the same got replicated to cluster
- * UTIL2. Then, bulk loads another file into UTIL2, and checks if related values are present on
- * UTIL2, and also gets replicated to UTIL1.
- * It also defines a preBulkLoad coprocessor that is added to all test table regions on each of the
- * clusters, in order to count amount of times bulk load actually gets invoked. This is to certify
- * we are not entered in the infinite loop condition addressed by HBASE-22380.
+ * Integration test for bulk load replication. Defines three clusters, with the following
+ * replication topology: "1 <-> 2 <-> 3" (active-active between 1 and 2, and active-active between
+ * 2 and 3).
+ *
+ * For each of defined test clusters, it performs a bulk load, asserting values on bulk loaded file
+ * gets replicated to other two peers. Since we are doing 3 bulk loads, with the given replication
+ * topology all these bulk loads should get replicated only once on each peer. To assert this,
+ * this test defines a preBulkLoad coprocessor and adds it to all test table regions, on each of the
+ * clusters. This CP counts the amount of times bulk load actually gets invoked, certifying
+ * we are not entering the infinite loop condition addressed by HBASE-22380.
  */
 @Category({ ReplicationTests.class, MediumTests.class})
 public class TestBulkLoadReplication extends TestReplicationBase {
@@ -103,11 +119,14 @@ public class TestBulkLoadReplication extends TestReplicationBase {
   private static final String PEER_ID1 = "1";
   private static final String PEER_ID3 = "3";
 
-  private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+  private static AtomicInteger BULK_LOADS_COUNT;
   private static CountDownLatch BULK_LOAD_LATCH;
 
   private static final HBaseTestingUtility UTIL3 = new HBaseTestingUtility();
   private static final Configuration CONF3 = UTIL3.getConfiguration();
+
+  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
+
   private static Table htable3;
 
   @Rule
@@ -132,7 +151,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     UTIL3.startMiniCluster(NUM_SLAVES1);
 
     TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
+        .setMobEnabled(true)
+        .setMobThreshold(4000)
         .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
       .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
 
@@ -147,6 +168,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
   @Before
   @Override
   public void setUpBase() throws Exception {
+    //"super.setUpBase()" already sets replication from 1->2,
+    //then on the subsequent lines, sets 2->1, 2->3 and 3->2.
+    //So we have following topology: "1 <-> 2 <->3"
     super.setUpBase();
     ReplicationPeerConfig peer1Config = getPeerConfigForCluster(UTIL1);
     ReplicationPeerConfig peer2Config = getPeerConfigForCluster(UTIL2);
@@ -157,9 +181,10 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     UTIL2.getAdmin().addReplicationPeer(PEER_ID3, peer3Config);
     //adds cluster2 as a remote peer on cluster3
     UTIL3.getAdmin().addReplicationPeer(PEER_ID2, peer2Config);
-    setupCoprocessor(UTIL1);
-    setupCoprocessor(UTIL2);
-    setupCoprocessor(UTIL3);
+    setupCoprocessor(UTIL1, "cluster1");
+    setupCoprocessor(UTIL2, "cluster2");
+    setupCoprocessor(UTIL3, "cluster3");
+    BULK_LOADS_COUNT = new AtomicInteger(0);
   }
 
   private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
@@ -167,12 +192,19 @@ public class TestBulkLoadReplication extends TestReplicationBase {
       .setClusterKey(util.getClusterKey()).setSerial(isSerialPeer()).build();
   }
 
-  private void setupCoprocessor(HBaseTestingUtility cluster){
+  private void setupCoprocessor(HBaseTestingUtility cluster, String name){
     cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
       try {
-        r.getCoprocessorHost()
-          .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
-            cluster.getConfiguration());
+        TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
+          findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+        if(cp == null) {
+          r.getCoprocessorHost().
+            load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
+              cluster.getConfiguration());
+          cp = r.getCoprocessorHost().
+            findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+          cp.clusterName = cluster.getClusterKey();
+        }
       } catch (Exception e){
         LOG.error(e.getMessage(), e);
       }
@@ -221,6 +253,31 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     assertEquals(9, BULK_LOADS_COUNT.get());
   }
 
+  @Test
+  public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
+    Path path = createMobFiles(UTIL3);
+    ColumnFamilyDescriptor descriptor =
+      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
+    ExecutorService pool = null;
+    try {
+      pool = Executors.newFixedThreadPool(1);
+      PartitionedMobCompactor compactor =
+        new PartitionedMobCompactor(UTIL3.getConfiguration(), UTIL3.getTestFileSystem(), tableName,
+          descriptor, pool);
+      BULK_LOAD_LATCH = new CountDownLatch(1);
+      BULK_LOADS_COUNT.set(0);
+      compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true);
+      assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
+      Thread.sleep(400);
+      assertEquals(1, BULK_LOADS_COUNT.get());
+    } finally {
+      if(pool != null && !pool.isTerminated()) {
+        pool.shutdownNow();
+      }
+    }
+  }
+
+
   private void assertBulkLoadConditions(byte[] row, byte[] value,
       HBaseTestingUtility utility, Table...tables) throws Exception {
     BULK_LOAD_LATCH = new CountDownLatch(3);
@@ -236,11 +293,11 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     String bulkLoadFilePath = createHFileForFamilies(row, value, cluster.getConfiguration());
     copyToHdfs(bulkLoadFilePath, cluster.getDFSCluster());
     BulkLoadHFilesTool bulkLoadHFilesTool = new BulkLoadHFilesTool(cluster.getConfiguration());
-    bulkLoadHFilesTool.bulkLoad(tableName, new Path("/bulk_dir"));
+    bulkLoadHFilesTool.bulkLoad(tableName, BULK_LOAD_BASE_DIR);
   }
 
   private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
-    Path bulkLoadDir = new Path("/bulk_dir/f");
+    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR, "f");
     cluster.getFileSystem().mkdirs(bulkLoadDir);
     cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
   }
@@ -281,22 +338,53 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     return hFileLocation.getAbsoluteFile().getAbsolutePath();
   }
 
+  private Path createMobFiles(HBaseTestingUtility util) throws IOException {
+    Path testDir = FSUtils.getRootDir(util.getConfiguration());
+    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+    Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+    MobFileName mobFileName = null;
+    byte[] mobFileStartRow = new byte[32];
+    for (byte rowKey : Bytes.toBytes("01234")) {
+      mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
+        UUID.randomUUID().toString().replaceAll("-", ""));
+      StoreFileWriter mobFileWriter =
+        new StoreFileWriter.Builder(util.getConfiguration(),
+          new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
+          .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+      long now = System.currentTimeMillis();
+      try {
+        for (int i = 0; i < 10; i++) {
+          byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
+          byte[] dummyData = new byte[5000];
+          new Random().nextBytes(dummyData);
+          mobFileWriter.append(
+            new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
+        }
+      } finally {
+        mobFileWriter.close();
+      }
+    }
+    return basePath;
+  }
+
   public static class BulkReplicationTestObserver implements RegionCoprocessor {
 
+    String clusterName;
+    AtomicInteger bulkLoadCounts = new AtomicInteger();
+
     @Override
     public Optional<RegionObserver> getRegionObserver() {
       return Optional.of(new RegionObserver() {
-        @Override
-        public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
-          List<Pair<byte[], String>> familyPaths) throws IOException {
-            BULK_LOADS_COUNT.incrementAndGet();
-        }
 
         @Override
         public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
           List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
             throws IOException {
           BULK_LOAD_LATCH.countDown();
+          BULK_LOADS_COUNT.incrementAndGet();
+          LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
+            bulkLoadCounts.addAndGet(1));
         }
       });
     }