You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by wc...@apache.org on 2019/10/10 09:37:49 UTC

[hbase] branch master updated: HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated

This is an automated email from the ASF dual-hosted git repository.

wchevreuil pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new fec4c52  HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated
fec4c52 is described below

commit fec4c5249968e112b5ab6f4154d0e6c1fe428abc
Author: Wellington Ramos Chevreuil <wc...@apache.org>
AuthorDate: Thu Oct 10 10:37:42 2019 +0100

    HBASE-23136 PartionedMobFileCompactor bulkloaded files shouldn't get replicated
    
     Signed-off-by: stack <st...@apache.org>
---
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 11 ++--
 .../hbase/shaded/protobuf/RequestConverter.java    |  5 +-
 .../src/main/protobuf/Client.proto                 |  1 +
 hbase-protocol-shaded/src/main/protobuf/WAL.proto  |  1 +
 .../hbase/client/AsyncClusterConnection.java       | 15 ++---
 .../hbase/client/AsyncClusterConnectionImpl.java   |  4 +-
 .../mob/compactions/PartitionedMobCompactor.java   |  4 +-
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  7 ++-
 .../hbase/regionserver/SecureBulkLoadManager.java  |  2 +-
 .../replication/regionserver/ReplicationSink.java  | 23 ++++----
 .../apache/hadoop/hbase/tool/BulkLoadHFiles.java   |  6 ++
 .../hadoop/hbase/tool/BulkLoadHFilesTool.java      |  9 ++-
 .../hbase/client/DummyAsyncClusterConnection.java  |  2 +-
 .../regionserver/TestBulkLoadReplication.java      | 65 +++++++++++++++++++++-
 .../tool/TestBulkLoadHFilesSplitRecovery.java      |  3 +-
 15 files changed, 123 insertions(+), 35 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 52e3bf1..8108217 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -2570,16 +2570,19 @@ public final class ProtobufUtil {
     ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
     Map<String, Long> storeFilesSize, long bulkloadSeqId) {
     return toBulkLoadDescriptor(tableName, encodedRegionName, storeFiles,
-      storeFilesSize, bulkloadSeqId, null);
+      storeFilesSize, bulkloadSeqId, null, true);
   }
 
   public static WALProtos.BulkLoadDescriptor toBulkLoadDescriptor(TableName tableName,
       ByteString encodedRegionName, Map<byte[], List<Path>> storeFiles,
-      Map<String, Long> storeFilesSize, long bulkloadSeqId, List<String> clusterIds) {
+      Map<String, Long> storeFilesSize, long bulkloadSeqId,
+      List<String> clusterIds, boolean replicate) {
     BulkLoadDescriptor.Builder desc =
         BulkLoadDescriptor.newBuilder()
-        .setTableName(ProtobufUtil.toProtoTableName(tableName))
-        .setEncodedRegionName(encodedRegionName).setBulkloadSeqNum(bulkloadSeqId);
+          .setTableName(ProtobufUtil.toProtoTableName(tableName))
+          .setEncodedRegionName(encodedRegionName)
+          .setBulkloadSeqNum(bulkloadSeqId)
+          .setReplicate(replicate);
     if(clusterIds != null) {
       desc.addAllClusterIds(clusterIds);
     }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index ae3cd3f..d45423c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -568,7 +568,7 @@ public final class RequestConverter {
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) {
     return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
-        false, null);
+        false, null, true);
   }
 
   /**
@@ -585,7 +585,7 @@ public final class RequestConverter {
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
       final List<Pair<byte[], String>> familyPaths, final byte[] regionName, boolean assignSeqNum,
         final Token<?> userToken, final String bulkToken, boolean copyFiles,
-          List<String> clusterIds) {
+          List<String> clusterIds, boolean replicate) {
     RegionSpecifier region = RequestConverter.buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
 
@@ -626,6 +626,7 @@ public final class RequestConverter {
     if (clusterIds != null) {
       request.addAllClusterIds(clusterIds);
     }
+    request.setReplicate(replicate);
     return request.build();
   }
 
diff --git a/hbase-protocol-shaded/src/main/protobuf/Client.proto b/hbase-protocol-shaded/src/main/protobuf/Client.proto
index 07d8d71..a22c623 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Client.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Client.proto
@@ -379,6 +379,7 @@ message BulkLoadHFileRequest {
   optional string bulk_token = 5;
   optional bool copy_file = 6 [default = false];
   repeated string cluster_ids = 7;
+  optional bool replicate = 8 [default = true];
 
   message FamilyPath {
     required bytes family = 1;
diff --git a/hbase-protocol-shaded/src/main/protobuf/WAL.proto b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
index c103075..fd622cf 100644
--- a/hbase-protocol-shaded/src/main/protobuf/WAL.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/WAL.proto
@@ -151,6 +151,7 @@ message BulkLoadDescriptor {
   repeated StoreDescriptor stores = 3;
   required int64 bulkload_seq_num = 4;
   repeated string cluster_ids = 5;
+  optional bool replicate = 6 [default = true];
 }
 
 /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 5c57817..92118ac 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -81,17 +81,18 @@ public interface AsyncClusterConnection extends AsyncConnection {
    * Defined as default here to avoid breaking callers who rely on the bulkLoad version that does
    * not expect additional clusterIds param.
    * @param tableName the target table
-   * @param familyPaths hdfs path for the the table family dirs containg files to be loaded
-   * @param row row key
-   * @param assignSeqNum seq num for the event on WAL
-   * @param userToken user token
-   * @param bulkToken bulk load token
-   * @param copyFiles flag for copying the loaded hfiles
+   * @param familyPaths hdfs path for the the table family dirs containg files to be loaded.
+   * @param row row key.
+   * @param assignSeqNum seq num for the event on WAL.
+   * @param userToken user token.
+   * @param bulkToken bulk load token.
+   * @param copyFiles flag for copying the loaded hfiles.
    * @param clusterIds list of cluster ids where the given bulk load has already been processed.
+   * @param replicate flags if the bulkload is targeted for replication.
    */
   CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
     byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles,
-    List<String> clusterIds);
+    List<String> clusterIds, boolean replicate);
 
   /**
    * Clean up after finishing bulk load, no matter success or not.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 746c3b8..046ef41 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -109,13 +109,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
   @Override
   public CompletableFuture<Boolean> bulkLoad(TableName tableName,
     List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
-    String bulkToken, boolean copyFiles, List<String> clusterIds) {
+    String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
     return callerFactory.<Boolean> single().table(tableName).row(row)
       .action((controller, loc, stub) -> ConnectionUtils
         .<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
           null,
           (rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
-            userToken, bulkToken, copyFiles, clusterIds),
+            userToken, bulkToken, copyFiles, clusterIds, replicate),
           (s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
       .call();
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index a5823ec..dba591d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -827,7 +827,9 @@ public class PartitionedMobCompactor extends MobCompactor {
       throws IOException {
     // bulkload the ref file
     try {
-      BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory);
+      BulkLoadHFiles bulkLoader = BulkLoadHFiles.create(conf);
+      bulkLoader.disableReplication();
+      bulkLoader.bulkLoad(tableName, bulkloadDirectory);
     } catch (Exception e) {
       throw new IOException(e);
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a9ff440..571be00 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6146,7 +6146,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
       BulkLoadListener bulkLoadListener) throws IOException {
-    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false, null);
+    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false,
+      null, true);
   }
 
   /**
@@ -6197,7 +6198,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
    */
   public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths,
       boolean assignSeqId, BulkLoadListener bulkLoadListener,
-        boolean copyFile, List<String> clusterIds) throws IOException {
+        boolean copyFile, List<String> clusterIds, boolean replicate) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     Map<String, Long> storeFilesSizes = new HashMap<>();
@@ -6372,7 +6373,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           WALProtos.BulkLoadDescriptor loadDescriptor =
               ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
                   UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
-                  storeFiles, storeFilesSizes, seqId, clusterIds);
+                  storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
           WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
               loadDescriptor, mvcc);
         } catch (IOException ioe) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index ad19473..bccc8fe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -296,7 +296,7 @@ public class SecureBulkLoadManager {
             //To enable access prior to staging
             return region.bulkLoadHFiles(familyPaths, true,
                 new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile(),
-              clusterIds);
+              clusterIds, request.getReplicate());
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 6c3f70c..51cbea8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -203,18 +203,19 @@ public class ReplicationSink {
           // Handle bulk load hfiles replication
           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
             BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
-            if(bulkLoadsPerClusters == null) {
-              bulkLoadsPerClusters = new HashMap<>();
-            }
-            // Map of table name Vs list of pair of family and list of
-            // hfile paths from its namespace
-            Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap =
-              bulkLoadsPerClusters.get(bld.getClusterIdsList());
-            if (bulkLoadHFileMap == null) {
-              bulkLoadHFileMap = new HashMap<>();
-              bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+            if(bld.getReplicate()) {
+              if (bulkLoadsPerClusters == null) {
+                bulkLoadsPerClusters = new HashMap<>();
+              }
+              // Map of table name Vs list of pair of family and list of
+              // hfile paths from its namespace
+              Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = bulkLoadsPerClusters.get(bld.getClusterIdsList());
+              if (bulkLoadHFileMap == null) {
+                bulkLoadHFileMap = new HashMap<>();
+                bulkLoadsPerClusters.put(bld.getClusterIdsList(), bulkLoadHFileMap);
+              }
+              buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
             }
-            buildBulkLoadHFileMap(bulkLoadHFileMap, table, bld);
           } else {
             // Handle wal replication
             if (isNewRowOrType(previousCell, cell)) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
index f3d627a..1cffe05 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -85,6 +85,10 @@ public interface BulkLoadHFiles {
       throws TableNotFoundException, IOException;
 
   /**
+   * Disable replication for this bulkload, if bulkload replication is configured.
+   */
+  void disableReplication();
+  /**
    * Perform a bulk load of the given directory into the given pre-existing table.
    * @param tableName the table to load into
    * @param dir the directory that was provided as the output path of a job using
@@ -97,4 +101,6 @@ public interface BulkLoadHFiles {
   static BulkLoadHFiles create(Configuration conf) {
     return new BulkLoadHFilesTool(conf);
   }
+
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 0e2e029..294d94b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -132,6 +132,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
   private String bulkToken;
 
   private List<String> clusterIds = new ArrayList<>();
+  private boolean replicate = true;
 
   public BulkLoadHFilesTool(Configuration conf) {
     // make a copy, just to be sure we're not overriding someone else's config
@@ -379,7 +380,8 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
           .collect(Collectors.toList());
       CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
       FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
-        fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds), (loaded, error) -> {
+        fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
+        (loaded, error) -> {
           if (error != null) {
             LOG.error("Encountered unrecoverable error from region server", error);
             if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
@@ -1052,4 +1054,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
     int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
     System.exit(ret);
   }
+
+  @Override
+  public void disableReplication(){
+    this.replicate = false;
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 5a34457..8755749 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -144,7 +144,7 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
   @Override
   public CompletableFuture<Boolean> bulkLoad(TableName tableName,
     List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
-    String bulkToken, boolean copyFiles, List<String> clusterIds) {
+    String bulkToken, boolean copyFiles, List<String> clusterIds, boolean replicate) {
     return null;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
index 7a49989..b227403 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBulkLoadReplication.java
@@ -24,10 +24,15 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Random;
+import java.util.UUID;
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +47,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
@@ -54,14 +60,22 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.mob.MobConstants;
+import org.apache.hadoop.hbase.mob.MobFileName;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.mob.compactions.PartitionedMobCompactor;
+import org.apache.hadoop.hbase.mob.compactions.TestPartitionedMobCompactor;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.replication.TestReplicationBase;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
@@ -137,7 +151,9 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     UTIL3.startMiniCluster(NUM_SLAVES1);
 
     TableDescriptor table = TableDescriptorBuilder.newBuilder(tableName)
-      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName).setMaxVersions(100)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
+        .setMobEnabled(true)
+        .setMobThreshold(4000)
         .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
       .setColumnFamily(ColumnFamilyDescriptorBuilder.of(noRepfamName)).build();
 
@@ -232,6 +248,23 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     assertEquals(9, BULK_LOADS_COUNT.get());
   }
 
+  @Test
+  public void testPartionedMOBCompactionBulkLoadDoesntReplicate() throws Exception {
+    Path path = createMobFiles(UTIL3);
+    ColumnFamilyDescriptor descriptor =
+      new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(famName);
+    PartitionedMobCompactor compactor = new PartitionedMobCompactor(UTIL3.getConfiguration(),
+      UTIL3.getTestFileSystem(), tableName, descriptor, Executors.newFixedThreadPool(1));
+    BULK_LOAD_LATCH = new CountDownLatch(1);
+    BULK_LOADS_COUNT.set(0);
+    compactor.compact(Arrays.asList(UTIL3.getTestFileSystem().listStatus(path)), true);
+    assertTrue(BULK_LOAD_LATCH.await(1, TimeUnit.SECONDS));
+    Thread.sleep(400);
+    assertEquals(1, BULK_LOADS_COUNT.get());
+
+  }
+
+
   private void assertBulkLoadConditions(byte[] row, byte[] value,
       HBaseTestingUtility utility, Table...tables) throws Exception {
     BULK_LOAD_LATCH = new CountDownLatch(3);
@@ -292,6 +325,36 @@ public class TestBulkLoadReplication extends TestReplicationBase {
     return hFileLocation.getAbsoluteFile().getAbsolutePath();
   }
 
+  private Path createMobFiles(HBaseTestingUtility util) throws IOException {
+    Path testDir = FSUtils.getRootDir(util.getConfiguration());
+    Path mobTestDir = new Path(testDir, MobConstants.MOB_DIR_NAME);
+    Path basePath = new Path(new Path(mobTestDir, tableName.getNameAsString()), "f");
+    HFileContext meta = new HFileContextBuilder().withBlockSize(8 * 1024).build();
+    MobFileName mobFileName = null;
+    byte[] mobFileStartRow = new byte[32];
+    for (byte rowKey : Bytes.toBytes("01234")) {
+      mobFileName = MobFileName.create(mobFileStartRow, MobUtils.formatDate(new Date()),
+        UUID.randomUUID().toString().replaceAll("-", ""));
+      StoreFileWriter mobFileWriter =
+        new StoreFileWriter.Builder(util.getConfiguration(),
+          new CacheConfig(util.getConfiguration()), util.getTestFileSystem()).withFileContext(meta)
+          .withFilePath(new Path(basePath, mobFileName.getFileName())).build();
+      long now = System.currentTimeMillis();
+      try {
+        for (int i = 0; i < 10; i++) {
+          byte[] key = Bytes.add(Bytes.toBytes(rowKey), Bytes.toBytes(i));
+          byte[] dummyData = new byte[5000];
+          new Random().nextBytes(dummyData);
+          mobFileWriter.append(
+            new KeyValue(key, famName, Bytes.toBytes("1"), now, KeyValue.Type.Put, dummyData));
+        }
+      } finally {
+        mobFileWriter.close();
+      }
+    }
+    return basePath;
+  }
+
   public static class BulkReplicationTestObserver implements RegionCoprocessor {
 
     String clusterName;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
index 1326564..b626fe8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
@@ -267,7 +267,8 @@ public class TestBulkLoadHFilesSplitRecovery {
   private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
     AsyncClusterConnection errConn = spy(conn);
     doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn)
-      .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList());
+      .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean(), anyList(),
+        anyBoolean());
     return errConn;
   }