You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/10/30 08:53:48 UTC

[hbase] branch branch-2.2 updated: HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 97498e2  HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)
97498e2 is described below

commit 97498e283911bf361a2b10e52db6f3619385ecf9
Author: Wellington Chevreuil <wc...@apache.org>
AuthorDate: Wed Oct 30 08:52:16 2019 +0000

    HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated (addressing buklload replication related issue raised in HBASE-22380)
    
    Signed-off-by: Josh Elser <el...@apache.org>
    (cherry picked from commit 4d414020bb3bfd7f214d2a599426be700df772b2, then resolved conflicts)
---
 .../hadoop/hbase/client/SecureBulkLoadClient.java  |   8 +-
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java |  11 ++-
 .../hbase/shaded/protobuf/RequestConverter.java    |   5 +-
 .../src/main/protobuf/Client.proto                 |   1 +
 hbase-protocol-shaded/src/main/protobuf/WAL.proto  |   1 +
 .../mob/compactions/PartitionedMobCompactor.java   |   1 +
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   7 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |   2 +-
 .../hbase/regionserver/SecureBulkLoadManager.java  |   2 +-
 .../replication/regionserver/ReplicationSink.java  |  24 ++---
 .../apache/hadoop/hbase/tool/BulkLoadHFiles.java   |   1 +
 .../hadoop/hbase/tool/LoadIncrementalHFiles.java   |  11 ++-
 .../regionserver/TestBulkLoadReplication.java      | 107 ++++++++++++++++++---
 13 files changed, 142 insertions(+), 39 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index 7e3166c..cb258e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -116,7 +116,7 @@ public class SecureBulkLoadClient {
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) throws IOException {
     return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
-      bulkToken, false, null);
+      bulkToken, false, null, true);
   }
 
   /**
@@ -138,17 +138,17 @@ public class SecureBulkLoadClient {
     final Token<?> userToken, final String bulkToken,
     boolean copyFiles) throws IOException {
     return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken,
-      bulkToken, false, null);
+      bulkToken, false, null, true);
   }
 
   public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
       final List<Pair<byte[], String>> familyPaths,
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken,
-      boolean copyFiles, List<String> clusterIds) throws IOException {
+      boolean copyFiles, List<String> clusterIds, boolean replicate) throws IOException {
     BulkLoadHFileRequest request =
         RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
-          userToken, bulkToken, copyFiles, clusterIds);
+          userToken, bulkToken, copyFiles, clusterIds, replicate);
 
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 753dd55..354e41c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2581,16 +2581,19 @@ public final class ProtobufUtil {
     ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
     Map<String, Long> storeFilesSize, long bulkloadSeqId) {
     return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
-      storeFilesSize, bulkloadSeqId, null);
+      storeFilesSize, bulkloadSeqId, null, true);
   }
 
   public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
       ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
-      Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
+      Map<String, Long> storeFilesSize, long bulkloadSeqId,
+      List<String> clusterIds, boolean replicate) {
     BulkLoadDescriptor.Builder desc =
         BulkLoadDescriptor.newBuilder()
-        .setTableName(ProtobufUtil.toProtoTableName(tableName))
-        .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+          .setTableName(ProtobufUtil.toProtoTableName(tableName))
+          .setEncodedRegionName(encodedRegionName)
+          .setBulkloadSeqNum(bulkloadSeqId)
+          .setReplicate(replicate);
     if(clusterIds != null) {
       desc.addAllClusterIds(clusterIds);
     }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index d2c818e..1c9bac7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -562,7 +562,7 @@ public final class RequestConverter {
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) {
     return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
-        false, null);
+        false, null, true);
   }
 
   /**
@@ -579,7 +579,7 @@ public final class RequestConverter {
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
       final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
         final Token<?> userToken, final String bulkToken, boolean copyFiles,
-          List<String> clusterIds) {
+          List<String> clusterIds, boolean replicate) {
     RegionSpecifier region = RequestConverter.buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
 
@@ -620,6 +620,7 @@ public final class RequestConverter {
     if (clusterIds != null) {
       request.addAllClusterIds(clusterIds);
     }
+    request.setReplicate(replicate);
     return request.build();
   }
 
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 07d8d71..a22c623 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
   optional string bulk_token = 5;
   optional bool copy_file = 6 [default = false];
   repeated string cluster_ids = 7;
+  optional bool replicate = 8 [default = true];
 
   message FamilyPath {
     required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index 9020daf..fe44813 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -145,6 +145,7 @@ message BulkLoadDescriptor {
   repeated StoreDescriptor stores = 3;
   required int64 bulkload_seq_num = 4;
   repeated string cluster_ids = 5;
+  optional bool replicate = 6 [default = true];
 }
 
 /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 09d9d98..ed29902 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -848,6 +848,7 @@ public class PartitionedMobCompactor extends MobCompactor {
     // bulkload the ref file
     try {
       LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+      bulkload.disableReplication();
       bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
           connection.getRegionLocator(table.getName()));
     } catch (Exception e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index d7d2a15..28a8749 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6101,7 +6101,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
       BulkLoadListener bulkLoadListener) throws IOException {
-    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
+    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
+      null, true);
   }
 
   /**
@@ -6152,7 +6153,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
       boolean assignSeqId, BulkLoadListener bulkLoadListener,
-        boolean copyFile, List<String> clusterIds) throws IOException {
+        boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6327,7 +6328,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           WALProtos.BulkLoadDescriptor loadDescriptor =
               ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
                   UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles, storeFilesSizes, seqId, clusterIds);
+                  storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5b12430..5e6625e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2393,7 +2393,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
         try {
           map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
-              request.getCopyFile(), clusterIds);
+              request.getCopyFile(), clusterIds, request.getReplicate());
         } finally {
           if (region.getCoprocessorHost() != null) {
             region.getCoprocessorHost().postBulkLoadHFile(familyPaths, map);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index f51608d..efc85cf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -294,7 +294,7 @@ public class SecureBulkLoadManager {
             //To enable access prior to staging
             return region.bulkLoadHFiles(familyPaths, true,
                 new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
-              clusterIds);
+              clusterIds, request.getReplicate());
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 6e68760..ae0a732 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -203,18 +203,20 @@ public class ReplicationSink {
           // Handle bulk load hfiles replication
           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-            if(bulkLoadsPerClusters == null) {
-              bulkLoadsPerClusters = new HashMap<>();
-            }
-            // Map of table name Vs list of pair of family and list of
-            // hfile paths from its namespace
-            Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
-              bulkLoadsPerClusters.get(bld.getClusterIdsList());
-            if (bulkLoadHFileMap == null) {
-              bulkLoadHFileMap = new HashMap<>();
-              bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+            if(bld.getReplicate()) {
+              if (bulkLoadsPerClusters == null) {
+                bulkLoadsPerClusters = new HashMap<>();
+              }
+              // Map of table name Vs list of pair of family and list of
+              // hfile paths from its namespace
+              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
+                bulkLoadsPerClusters.get(bld.getClusterIdsList());
+              if (bulkLoadHFileMap == null) {
+                bulkLoadHFileMap = new HashMap<>();
+                bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+              }
+              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
             }
-            buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
           } else {
             // Handle wal replication
             if (isNewRowOrType(previousCell, cell)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
index f3d627a..702ed75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -97,4 +97,5 @@ public interface BulkLoadHFiles {
   static BulkLoadHFiles create(Configuration conf) {
     return new BulkLoadHFilesTool(conf);
   }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index c5ba5a7..16ed114 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -157,6 +157,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   private List<String> clusterIds = new ArrayList<>();
 
+  private boolean replicate = true;
+
   /**
    * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
    * the case where a region has split during the process of the load. When this happens, the HFile
@@ -547,7 +549,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           try (Table table = conn.getTable(getTableName())) {
             secureClient = new SecureBulkLoadClient(getConf(), table);
             success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-              assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile, clusterIds);
+              assignSeqIds, fsDelegationToken.getUserToken(),
+                bulkToken, copyFile, clusterIds, replicate);
           }
           return success ? regionName : null;
         } finally {
@@ -1267,6 +1270,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   }
 
   /**
+   * Disables replication for these bulkloaded files.
+   */
+  public void disableReplication(){
+    this.replicate = false;
+  }
+  /**
    * Infers region boundaries for a new table.
    * <p>
    * Parameter: <br>
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 4d69bdf..14e3976 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -25,11 +25,17 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
@@ -45,6 +51,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -57,14 +64,21 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
@@ -109,9 +123,11 @@ public class TestBulkLoadReplication extends TestReplicationBase {
   private static final String PEER_ID3 = "3";
   private static final String PEER_ID4 = "4";
 
-  private static final AtomicInteger BULK_LOADS_COUNT = new AtomicInteger(0);
+  private static AtomicInteger BULK_LOADS_COUNT;
   private static CountDownLatch BULK_LOAD_LATCH;
 
+  private static final Path BULK_LOAD_BASE_DIR = new Path("/bulk_dir");
+
   private static HBaseTestingUtility utility3;
   private static HBaseTestingUtility utility4;
   private static Configuration conf3;
@@ -151,7 +167,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     util.startMiniCluster(2);
 
     TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
+        .setMobEnabled(true)
+        .setMobThreshold(4000)
         .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
       .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
 
@@ -180,6 +198,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     setupCoprocessor(utility1);
     setupCoprocessor(utility4);
     setupCoprocessor(utility3);
+    BULK_LOADS_COUNT = new AtomicInteger(0);
   }
 
   private ReplicationPeerConfig getPeerConfigForCluster(HBaseTestingUtility util) {
@@ -190,9 +209,16 @@ public class TestBulkLoadReplication extends TestReplicationBase {
   private void setupCoprocessor(HBaseTestingUtility cluster){
     cluster.getHBaseCluster().getRegions(tableName).forEach(r -> {
       try {
-        r.getCoprocessorHost()
-          .load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
-            cluster.getConfiguration());
+        TestBulkLoadReplication.BulkReplicationTestObserver cp = r.getCoprocessorHost().
+          findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+        if(cp == null) {
+          r.getCoprocessorHost().
+            load(TestBulkLoadReplication.BulkReplicationTestObserver.class, 0,
+              cluster.getConfiguration());
+          cp = r.getCoprocessorHost().
+            findCoprocessor(TestBulkLoadReplication.BulkReplicationTestObserver.class);
+          cp.clusterName = cluster.getClusterKey();
+        }
       } catch (Exception e){
         LOG.error(e.getMessage(), e);
       }
@@ -206,6 +232,7 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     utility4.getAdmin().removeReplicationPeer(PEER_ID1);
     utility4.getAdmin().removeReplicationPeer(PEER_ID3);
     utility3.getAdmin().removeReplicationPeer(PEER_ID4);
+    utility1.getAdmin().removeReplicationPeer(PEER_ID4);
   }
 
   private static void setupBulkLoadConfigsForCluster(Configuration config,
@@ -241,6 +268,31 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     assertEquals(9, BULK_LOADS_COUNT.get());
   }
 
+  @Test
+  public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
+    Path path = createMobFiles(utility3);
+    ColumnFamilyDescriptor descriptor =
+      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
+    ExecutorService pool = null;
+    try {
+      pool = Executors.newFixedThreadPool(1);
+      PartitionedMobCompactor compactor =
+        new PartitionedMobCompactor(utility3.getConfiguration(), utility3.getTestFileSystem(),
+          tableName, descriptor, pool);
+      BULK_LOAD_LATCH = new CountDownLatch(1);
+      BULK_LOADS_COUNT.set(0);
+      compactor.compact(Arrays.asList(utility3.getTestFileSystem().listStatus(path)), true);
+      assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
+      Thread.sleep(400);
+      assertEquals(1, BULK_LOADS_COUNT.get());
+    } finally {
+      if(pool != null && !pool.isTerminated()) {
+        pool.shutdownNow();
+      }
+    }
+  }
+
+
   private void assertBulkLoadConditions(byte[] row, byte[] value,
       HBaseTestingUtility utility, Table...tables) throws Exception {
     BULK_LOAD_LATCH = new CountDownLatch(3);
@@ -260,13 +312,13 @@ public class TestBulkLoadReplication extends TestReplicationBase {
       new LoadIncrementalHFiles(cluster.getConfiguration());
     Map<byte[], List<Path>> family2Files = new HashMap<>();
     List<Path> files = new ArrayList<>();
-    files.add(new Path("/bulk_dir/f/" + bulkLoadFilePath.getName()));
+    files.add(new Path(BULK_LOAD_BASE_DIR + "/f/" + bulkLoadFilePath.getName()));
     family2Files.put(Bytes.toBytes("f"), files);
     bulkLoadHFilesTool.run(family2Files, tableName);
   }
 
   private void copyToHdfs(String bulkLoadFilePath, MiniDFSCluster cluster) throws Exception {
-    Path bulkLoadDir = new Path("/bulk_dir/f");
+    Path bulkLoadDir = new Path(BULK_LOAD_BASE_DIR + "/f/");
     cluster.getFileSystem().mkdirs(bulkLoadDir);
     cluster.getFileSystem().copyFromLocalFile(new Path(bulkLoadFilePath), bulkLoadDir);
   }
@@ -307,22 +359,53 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     return hFileLocation.getAbsoluteFile().getAbsolutePath();
   }
 
+  private Path createMobFiles(HBaseTestingUtility util) throws IOException {
+    Path testDir = FSUtils.getRootDir(util.getConfiguration());
+    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+    Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+    MobFileName mobFileName = null;
+    byte[] mobFileStartRow = new byte[32];
+    for (byte rowKey : Bytes.toBytes("01234")) {
+      mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
+        UUID.randomUUID().toString().replaceAll("-", ""));
+      StoreFileWriter mobFileWriter =
+        new StoreFileWriter.Builder(util.getConfiguration(),
+          new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
+          .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+      long now = System.currentTimeMillis();
+      try {
+        for (int i = 0; i < 10; i++) {
+          byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
+          byte[] dummyData = new byte[5000];
+          new Random().nextBytes(dummyData);
+          mobFileWriter.append(
+            new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
+        }
+      } finally {
+        mobFileWriter.close();
+      }
+    }
+    return basePath;
+  }
+
   public static class BulkReplicationTestObserver implements RegionCoprocessor {
 
+    String clusterName;
+    AtomicInteger bulkLoadCounts = new AtomicInteger();
+
     @Override
     public Optional<RegionObserver> getRegionObserver() {
       return Optional.of(new RegionObserver() {
-        @Override
-        public void preBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
-          List<Pair<byte[], String>> familyPaths) throws IOException {
-            BULK_LOADS_COUNT.incrementAndGet();
-        }
 
         @Override
         public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
           List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths)
             throws IOException {
           BULK_LOAD_LATCH.countDown();
+          BULK_LOADS_COUNT.incrementAndGet();
+          LOG.debug("Another file bulk loaded. Total for {}: {}", clusterName,
+            bulkLoadCounts.addAndGet(1));
         }
       });
     }