You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2015/12/15 18:43:19 UTC

[05/26] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a8cffba..9ff4b2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -45,8 +45,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
@@ -225,8 +227,16 @@ public class ReplicationSourceManager implements ReplicationListener {
    * old region server wal queues
    */
   protected void init() throws IOException, ReplicationException {
+    boolean replicationForBulkLoadDataEnabled =
+        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
     for (String id : this.replicationPeers.getPeerIds()) {
       addSource(id);
+      if (replicationForBulkLoadDataEnabled) {
+        // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
+        // when a peer was added before replication for bulk loaded data was enabled.
+        this.replicationQueues.addPeerToHFileRefs(id);
+      }
     }
     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
     if (currentReplicators == null || currentReplicators.size() == 0) {
@@ -733,4 +743,15 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
     return stats.toString();
   }
+
+  public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+      throws ReplicationException {
+    for (ReplicationSourceInterface source : this.sources) {
+      source.addHFileRefs(tableName, family, files);
+    }
+  }
+
+  public void cleanUpHFileRefs(String peerId, List<String> files) {
+    this.replicationQueues.removeHFileRefs(peerId, files);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
new file mode 100644
index 0000000..8271115
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Interface that defines how a region server in peer cluster will get source cluster file system
+ * configurations. User can configure their custom implementation implementing this interface by
+ * setting the value of their custom implementation's fully qualified class name to
+ * hbase.replication.source.fs.conf.provider property in RegionServer configuration. Default is
+ * {@link DefaultSourceFSConfigurationProvider}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface SourceFSConfigurationProvider {
+
+  /**
+   * Returns the source cluster file system configuration for the given source cluster replication
+   * ID.
+   * @param sinkConf sink cluster configuration
+   * @param replicationClusterId unique ID which identifies the source cluster
+   * @return source cluster file system configuration
+   * @throws IOException for invalid directory or for a bad disk.
+   */
+  public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index becc9f3..3541ade 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -217,7 +217,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
     for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
       familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
     }
-    
+
     Token userToken = null;
     if (userProvider.isHadoopSecurityEnabled()) {
       userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
@@ -375,6 +375,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
     public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
       Path p = new Path(srcPath);
       Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
+
+      // In case of Replication for bulk load files, hfiles are already copied in staging directory
+      if (p.equals(stageP)) {
+        LOG.debug(p.getName()
+            + " is already available in staging directory. Skipping copy or rename.");
+        return stageP.toString();
+      }
+
       if (srcFs == null) {
         srcFs = FileSystem.get(p.toUri(), conf);
       }
@@ -414,6 +422,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
       Path p = new Path(srcPath);
       Path stageP = new Path(stagingDir,
           new Path(Bytes.toString(family), p.getName()));
+
+      // In case of Replication for bulk load files, hfiles are not renamed by end point during
+      // prepare stage, so no need of rename here again
+      if (p.equals(stageP)) {
+        LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
+        return;
+      }
+
       LOG.debug("Moving " + stageP + " back to " + p);
       if(!fs.rename(stageP, p))
         throw new IOException("Failed to move HFile: " + stageP + " to " + p);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
new file mode 100644
index 0000000..87db386
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestReplicationHFileCleaner {
+  private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Server server;
+  private static ReplicationQueues rq;
+  private static ReplicationPeers rp;
+  private static final String peerId = "TestReplicationHFileCleaner";
+  private static Configuration conf = TEST_UTIL.getConfiguration();
+  static FileSystem fs = null;
+  Path root;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+    server = new DummyServer();
+    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    Replication.decorateMasterConfiguration(conf);
+    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
+    rp.init();
+
+    rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
+    rq.init(server.getServerName().toString());
+    try {
+      fs = FileSystem.get(conf);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+    }
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Before
+  public void setup() throws ReplicationException, IOException {
+    root = TEST_UTIL.getDataTestDirOnTestFS();
+    rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
+  }
+
+  @After
+  public void cleanup() throws ReplicationException {
+    try {
+      fs.delete(root, true);
+    } catch (IOException e) {
+      LOG.warn("Failed to delete files recursively from path " + root);
+    }
+    rp.removePeer(peerId);
+  }
+
+  @Test
+  public void testIsFileDeletable() throws IOException, ReplicationException {
+    // 1. Create a file
+    Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
+    fs.createNewFile(file);
+    // 2. Assert file is successfully created
+    assertTrue("Test file not created!", fs.exists(file));
+    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+    cleaner.setConf(conf);
+    // 3. Assert that file as is should be deletable
+    assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
+        + "for it in the queue.",
+      cleaner.isFileDeletable(fs.getFileStatus(file)));
+
+    List<String> files = new ArrayList<String>(1);
+    files.add(file.getName());
+    // 4. Add the file to hfile-refs queue
+    rq.addHFileRefs(peerId, files);
+    // 5. Assert file should not be deletable
+    assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node "
+        + "for it in the queue.",
+      cleaner.isFileDeletable(fs.getFileStatus(file)));
+  }
+
+  @Test
+  public void testGetDeletableFiles() throws Exception {
+    // 1. Create two files and assert that they do not exist
+    Path notDeletablefile = new Path(root, "testGetDeletableFiles_1");
+    fs.createNewFile(notDeletablefile);
+    assertTrue("Test file not created!", fs.exists(notDeletablefile));
+    Path deletablefile = new Path(root, "testGetDeletableFiles_2");
+    fs.createNewFile(deletablefile);
+    assertTrue("Test file not created!", fs.exists(deletablefile));
+
+    List<FileStatus> files = new ArrayList<FileStatus>(2);
+    FileStatus f = new FileStatus();
+    f.setPath(deletablefile);
+    files.add(f);
+    f = new FileStatus();
+    f.setPath(notDeletablefile);
+    files.add(f);
+
+    List<String> hfiles = new ArrayList<>(1);
+    hfiles.add(notDeletablefile.getName());
+    // 2. Add one file to hfile-refs queue
+    rq.addHFileRefs(peerId, hfiles);
+
+    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+    cleaner.setConf(conf);
+    Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
+    int i = 0;
+    while (deletableFilesIterator.hasNext() && i < 2) {
+      i++;
+    }
+    // 5. Assert one file should not be deletable and it is present in the list returned
+    if (i > 2) {
+      fail("File " + notDeletablefile
+          + " should not be deletable as its hfile reference node is not added.");
+    }
+    assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
+  }
+
+  /*
+   * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test
+   * will end up in a infinite loop, so it will timeout.
+   */
+  @Test(timeout = 15000)
+  public void testForDifferntHFileRefsZnodeVersion() throws Exception {
+    // 1. Create a file
+    Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
+    fs.createNewFile(file);
+    // 2. Assert file is successfully created
+    assertTrue("Test file not created!", fs.exists(file));
+    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+    cleaner.setConf(conf);
+
+    ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
+    //Return different znode version for each call
+    Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
+
+    Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
+    Field rqc = cleanerClass.getDeclaredField("rqc");
+    rqc.setAccessible(true);
+    rqc.set(cleaner, replicationQueuesClient);
+
+    cleaner.isFileDeletable(fs.getFileStatus(file));
+  }
+
+  static class DummyServer implements Server {
+
+    @Override
+    public Configuration getConfiguration() {
+      return TEST_UTIL.getConfiguration();
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      try {
+        return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      return null;
+    }
+
+    @Override
+    public CoordinatedStateManager getCoordinatedStateManager() {
+      return null;
+    }
+
+    @Override
+    public ClusterConnection getConnection() {
+      return null;
+    }
+
+    @Override
+    public MetaTableLocator getMetaTableLocator() {
+      return null;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return ServerName.valueOf("regionserver,60020,000000");
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+
+    @Override
+    public void stop(String why) {
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+
+    @Override
+    public ChoreService getChoreService() {
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index f463f76..abe484e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -19,12 +19,14 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -89,4 +91,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   public String getStats() {
     return "";
   }
+
+  @Override
+  public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+      throws ReplicationException {
+    return;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 455a790..e919c24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,15 +19,21 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -35,7 +41,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -48,12 +56,17 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
@@ -79,6 +92,7 @@ public class TestMasterReplication {
 
   private static final TableName tableName = TableName.valueOf("test");
   private static final byte[] famName = Bytes.toBytes("f");
+  private static final byte[] famName1 = Bytes.toBytes("f1");
   private static final byte[] row = Bytes.toBytes("row");
   private static final byte[] row1 = Bytes.toBytes("row1");
   private static final byte[] row2 = Bytes.toBytes("row2");
@@ -103,7 +117,11 @@ public class TestMasterReplication {
     baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
     baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
     baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
-        HConstants.REPLICATION_ENABLE_DEFAULT);
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    baseConfiguration.set("hbase.replication.source.fs.conf.provider",
+      TestSourceFSConfigurationProvider.class.getCanonicalName());
+    baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
     baseConfiguration.setBoolean("dfs.support.append", true);
     baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
     baseConfiguration.setStrings(
@@ -114,6 +132,9 @@ public class TestMasterReplication {
     HColumnDescriptor fam = new HColumnDescriptor(famName);
     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
     table.addFamily(fam);
+    fam = new HColumnDescriptor(famName1);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
     fam = new HColumnDescriptor(noRepfamName);
     table.addFamily(fam);
   }
@@ -130,14 +151,7 @@ public class TestMasterReplication {
     int numClusters = 2;
     Table[] htables = null;
     try {
-      startMiniClusters(numClusters);
-      createTableOnClusters(table);
-
-      htables = getHTablesOnClusters(tableName);
-
-      // Test the replication scenarios of 0 -> 1 -> 0
-      addPeer("1", 0, 1);
-      addPeer("1", 1, 0);
+      htables = setUpClusterTablesAndPeers(numClusters);
 
       int[] expectedCounts = new int[] { 2, 2 };
 
@@ -157,12 +171,64 @@ public class TestMasterReplication {
   }
 
   /**
-   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
-   * deleting rows to a table in each clusters and ensuring that the each of
-   * these clusters get the appropriate mutations. It also tests the grouping
-   * scenario where a cluster needs to replicate the edits originating from
-   * itself and also the edits that it received using replication from a
-   * different cluster. The scenario is explained in HBASE-9158
+   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
+   * HFiles to a table in each cluster, checking if it's replicated.
+   */
+  @Test(timeout = 300000)
+  public void testHFileCyclicReplication() throws Exception {
+    LOG.info("testHFileCyclicReplication");
+    int numClusters = 2;
+    Table[] htables = null;
+    try {
+      htables = setUpClusterTablesAndPeers(numClusters);
+
+      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
+      // to cluster '1'.
+      byte[][][] hfileRanges =
+          new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+              new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
+      int numOfRows = 100;
+      int[] expectedCounts =
+          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+      loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
+        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
+
+      // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
+      // to cluster '0'.
+      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
+          new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
+      numOfRows = 200;
+      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
+          hfileRanges.length * numOfRows + expectedCounts[1] };
+
+      loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
+        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
+
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
+  }
+
+  private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
+    Table[] htables;
+    startMiniClusters(numClusters);
+    createTableOnClusters(table);
+
+    htables = getHTablesOnClusters(tableName);
+    // Test the replication scenarios of 0 -> 1 -> 0
+    addPeer("1", 0, 1);
+    addPeer("1", 1, 0);
+    return htables;
+  }
+
+  /**
+   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
+   * table in each clusters and ensuring that the each of these clusters get the appropriate
+   * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
+   * originating from itself and also the edits that it received using replication from a different
+   * cluster. The scenario is explained in HBASE-9158
    */
   @Test(timeout = 300000)
   public void testCyclicReplication2() throws Exception {
@@ -213,6 +279,119 @@ public class TestMasterReplication {
   }
 
   /**
+   * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
+   * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
+   */
+  @Test(timeout = 300000)
+  public void testHFileMultiSlaveReplication() throws Exception {
+    LOG.info("testHFileMultiSlaveReplication");
+    int numClusters = 3;
+    Table[] htables = null;
+    try {
+      startMiniClusters(numClusters);
+      createTableOnClusters(table);
+
+      // Add a slave, 0 -> 1
+      addPeer("1", 0, 1);
+
+      htables = getHTablesOnClusters(tableName);
+
+      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
+      // to cluster '1'.
+      byte[][][] hfileRanges =
+          new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
+              new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
+      int numOfRows = 100;
+
+      int[] expectedCounts =
+          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+      loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
+        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
+
+      // Validate data is not replicated to cluster '2'.
+      assertEquals(0, utilities[2].countRows(htables[2]));
+
+      rollWALAndWait(utilities[0], htables[0].getName(), row);
+
+      // Add one more slave, 0 -> 2
+      addPeer("2", 0, 2);
+
+      // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
+      // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
+      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
+          new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
+      numOfRows = 200;
+
+      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
+          hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
+
+      loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
+        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
+
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
+  }
+  
+  /**
+   * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
+   * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
+   * only one CF data to replicate.
+   */
+  @Test(timeout = 300000)
+  public void testHFileReplicationForConfiguredTableCfs() throws Exception {
+    LOG.info("testHFileReplicationForConfiguredTableCfs");
+    int numClusters = 2;
+    Table[] htables = null;
+    try {
+      startMiniClusters(numClusters);
+      createTableOnClusters(table);
+
+      htables = getHTablesOnClusters(tableName);
+      // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
+      addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
+
+      // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
+      byte[][][] hfileRanges =
+          new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+              new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
+      int numOfRows = 100;
+      int[] expectedCounts =
+          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+      loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
+        hfileRanges, numOfRows, expectedCounts, true);
+
+      // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
+      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
+          new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
+      numOfRows = 100;
+
+      int[] newExpectedCounts =
+          new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
+
+      loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
+        hfileRanges, numOfRows, newExpectedCounts, false);
+
+      // Validate data replication for CF 'f1'
+
+      // Source cluster table should contain data for the families
+      wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
+
+      // Sleep for enough time so that the data is still not replicated for the CF which is not
+      // configured for replication
+      Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
+      // Peer cluster should have only configured CF data
+      wait(1, htables[1], expectedCounts[1]);
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
+  }
+
+  /**
    * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
    */
   @Test(timeout = 300000)
@@ -328,6 +507,17 @@ public class TestMasterReplication {
       close(replicationAdmin);
     }
   }
+  
+  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
+      throws Exception {
+    ReplicationAdmin replicationAdmin = null;
+    try {
+      replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
+      replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey(), tableCfs);
+    } finally {
+      close(replicationAdmin);
+    }
+  }
 
   private void disablePeer(String id, int masterClusterNumber) throws Exception {
     ReplicationAdmin replicationAdmin = null;
@@ -405,8 +595,56 @@ public class TestMasterReplication {
     wait(row, target, false);
   }
 
-  private void wait(byte[] row, Table target, boolean isDeleted)
-      throws Exception {
+  private void loadAndValidateHFileReplication(String testName, int masterNumber,
+      int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
+      int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
+    HBaseTestingUtility util = utilities[masterNumber];
+
+    Path dir = util.getDataTestDirOnTestFS(testName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(fam));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      HFileTestUtil.createHFile(util.getConfiguration(), fs,
+        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
+    }
+
+    Table source = tables[masterNumber];
+    final TableName tableName = source.getName();
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+    String[] args = { dir.toString(), tableName.toString() };
+    loader.run(args);
+
+    if (toValidate) {
+      for (int slaveClusterNumber : slaveNumbers) {
+        wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
+      }
+    }
+  }
+
+  private void wait(int slaveNumber, Table target, int expectedCount)
+      throws IOException, InterruptedException {
+    int count = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for bulkloaded data replication. Current count=" + count
+            + ", expected count=" + expectedCount);
+      }
+      count = utilities[slaveNumber].countRows(target);
+      if (count != expectedCount) {
+        LOG.info("Waiting more time for bulkloaded data replication.");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
     Get get = new Get(row);
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i == NB_RETRIES - 1) {
@@ -430,6 +668,47 @@ public class TestMasterReplication {
     }
   }
 
+  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+      final byte[] row) throws IOException {
+    final Admin admin = utility.getHBaseAdmin();
+    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+    // find the region that corresponds to the given row.
+    HRegion region = null;
+    for (HRegion candidate : cluster.getRegions(table)) {
+      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+        region = candidate;
+        break;
+      }
+    }
+    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    // listen for successful log rolls
+    final WALActionsListener listener = new WALActionsListener.Base() {
+          @Override
+          public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+            latch.countDown();
+          }
+        };
+    region.getWAL().registerWALActionsListener(listener);
+
+    // request a roll
+    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+      region.getRegionInfo().getRegionName()));
+
+    // wait
+    try {
+      latch.await();
+    } catch (InterruptedException exception) {
+      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
+          "replication tests fail, it's probably because we should still be waiting.");
+      Thread.currentThread().interrupt();
+    }
+    region.getWAL().unregisterWALActionsListener(listener);
+  }
+
   /**
    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
    * timestamp there is otherwise no way to count them.

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 4823597..47d2880 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -658,7 +658,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     HRegionInfo hri = new HRegionInfo(htable1.getName(),
       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
     WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
-    Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit);
+    Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit,
+      htable1.getConfiguration(), null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 696c130..41c3240 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -160,6 +161,62 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
+  public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
+    rp.init();
+    rq1.init(server1);
+    rqc.init();
+
+    List<String> files1 = new ArrayList<String>(3);
+    files1.add("file_1");
+    files1.add("file_2");
+    files1.add("file_3");
+    assertNull(rqc.getReplicableHFiles(ID_ONE));
+    assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rq1.addHFileRefs(ID_ONE, files1);
+    assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
+    assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
+    List<String> files2 = new ArrayList<>(files1);
+    String removedString = files2.remove(0);
+    rq1.removeHFileRefs(ID_ONE, files2);
+    assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
+    files2 = new ArrayList<>(1);
+    files2.add(removedString);
+    rq1.removeHFileRefs(ID_ONE, files2);
+    assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
+    rp.removePeer(ID_ONE);
+  }
+
+  @Test
+  public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
+    rq1.init(server1);
+    rqc.init();
+
+    rp.init();
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+
+    List<String> files1 = new ArrayList<String>(3);
+    files1.add("file_1");
+    files1.add("file_2");
+    files1.add("file_3");
+    rq1.addHFileRefs(ID_ONE, files1);
+    rq1.addHFileRefs(ID_TWO, files1);
+    assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
+    assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
+    assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
+
+    rp.removePeer(ID_ONE);
+    assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
+    assertNull(rqc.getReplicableHFiles(ID_ONE));
+    assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
+
+    rp.removePeer(ID_TWO);
+    assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
+    assertNull(rqc.getReplicableHFiles(ID_TWO));
+  }
+
+  @Test
   public void testReplicationPeers() throws Exception {
     rp.init();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 4587c61..3b7402a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -64,6 +64,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
     utility = new HBaseTestingUtility();
     utility.startMiniZKCluster();
     conf = utility.getConfiguration();
+    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
     replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 13545b5..b36bb9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -52,15 +52,15 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
   private static final TableName t1_su = TableName.valueOf("t1_syncup");
   private static final TableName t2_su = TableName.valueOf("t2_syncup");
 
-  private static final byte[] famName = Bytes.toBytes("cf1");
+  protected static final byte[] famName = Bytes.toBytes("cf1");
   private static final byte[] qualName = Bytes.toBytes("q1");
 
-  private static final byte[] noRepfamName = Bytes.toBytes("norep");
+  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
 
   private HTableDescriptor t1_syncupSource, t1_syncupTarget;
   private HTableDescriptor t2_syncupSource, t2_syncupTarget;
 
-  private Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
+  protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
 
   @Before
   public void setUp() throws Exception {
@@ -179,7 +179,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
 
   }
 
-  private void setupReplication() throws Exception {
+  protected void setupReplication() throws Exception {
     ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
     ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
 
@@ -418,7 +418,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
     }
   }
 
-  private void syncUp(HBaseTestingUtility ut) throws Exception {
+  protected void syncUp(HBaseTestingUtility ut) throws Exception {
     ReplicationSyncUp.setConfigure(ut.getConfiguration());
     String[] arguments = new String[] { null };
     new ReplicationSyncUp().run(arguments);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
new file mode 100644
index 0000000..f54c632
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+    conf1.set("hbase.replication.source.fs.conf.provider",
+      TestSourceFSConfigurationProvider.class.getCanonicalName());
+    String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+    if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
+      classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
+      conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
+    }
+
+    TestReplicationBase.setUpBeforeClass();
+  }
+
+  @Override
+  public void testSyncUpTool() throws Exception {
+    /**
+     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
+     * 'cf1' : replicated 'norep': not replicated
+     */
+    setupReplication();
+
+    /**
+     * Prepare 16 random hfile ranges required for creating hfiles
+     */
+    Iterator<String> randomHFileRangeListIterator = null;
+    Set<String> randomHFileRanges = new HashSet<String>(16);
+    for (int i = 0; i < 16; i++) {
+      randomHFileRanges.add(UUID.randomUUID().toString());
+    }
+    List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
+    Collections.sort(randomHFileRangeList);
+    randomHFileRangeListIterator = randomHFileRangeList.iterator();
+
+    /**
+     * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
+     * into cf1, and 3 rows into norep verify correctly replicated to slave
+     */
+    loadAndReplicateHFiles(true, randomHFileRangeListIterator);
+
+    /**
+     * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
+     * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
+     * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
+     * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
+     * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
+     * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
+     * Slave
+     */
+    mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
+
+  }
+
+  private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
+      throws Exception {
+    LOG.debug("mimicSyncUpAfterBulkLoad");
+    utility2.shutdownMiniHBaseCluster();
+
+    loadAndReplicateHFiles(false, randomHFileRangeListIterator);
+
+    int rowCount_ht1Source = utility1.countRows(ht1Source);
+    assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
+      rowCount_ht1Source);
+
+    int rowCount_ht2Source = utility1.countRows(ht2Source);
+    assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
+      rowCount_ht2Source);
+
+    utility1.shutdownMiniHBaseCluster();
+    utility2.restartHBaseCluster(1);
+
+    Thread.sleep(SLEEP_TIME);
+
+    // Before sync up
+    int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+    int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
+    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
+
+    // Run sync up tool
+    syncUp(utility1);
+
+    // After syun up
+    for (int i = 0; i < NB_RETRIES; i++) {
+      syncUp(utility1);
+      rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+      rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+      if (i == NB_RETRIES - 1) {
+        if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
+          // syncUP still failed. Let's look at the source in case anything wrong there
+          utility1.restartHBaseCluster(1);
+          rowCount_ht1Source = utility1.countRows(ht1Source);
+          LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
+          rowCount_ht2Source = utility1.countRows(ht2Source);
+          LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
+        }
+        assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
+          rowCount_ht1TargetAtPeer1);
+        assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
+          rowCount_ht2TargetAtPeer1);
+      }
+      if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
+        LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
+        break;
+      } else {
+        LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+            + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+            + rowCount_ht2TargetAtPeer1);
+      }
+      Thread.sleep(SLEEP_TIME);
+    }
+  }
+
+  private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
+      Iterator<String> randomHFileRangeListIterator) throws Exception {
+    LOG.debug("loadAndReplicateHFiles");
+
+    // Load 100 + 3 hfiles to t1_syncup.
+    byte[][][] hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
+      100);
+
+    hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
+      hfileRanges, 3);
+
+    // Load 200 + 3 hfiles to t2_syncup.
+    hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
+      200);
+
+    hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
+      hfileRanges, 3);
+
+    if (verifyReplicationOnSlave) {
+      // ensure replication completed
+      wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
+        "t1_syncup has 103 rows on source, and 100 on slave1");
+
+      wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
+        "t2_syncup has 203 rows on source, and 200 on slave1");
+    }
+  }
+
+  private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
+      Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
+    Path dir = utility1.getDataTestDirOnTestFS(testName);
+    FileSystem fs = utility1.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(fam));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
+          + hfileIdx++), fam, row, from, to, numOfRows);
+    }
+
+    final TableName tableName = source.getName();
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
+    String[] args = { dir.toString(), tableName.toString() };
+    loader.run(args);
+  }
+
+  private void wait(Table target, int expectedCount, String msg) throws IOException,
+      InterruptedException {
+    for (int i = 0; i < NB_RETRIES; i++) {
+      int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
+      if (i == NB_RETRIES - 1) {
+        assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
+      }
+      if (expectedCount == rowCount_ht2TargetAtPeer1) {
+        break;
+      }
+      Thread.sleep(SLEEP_TIME);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index b87e7ef..f08d2bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -21,32 +21,52 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -58,21 +78,18 @@ public class TestReplicationSink {
   private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
   private static final int BATCH_SIZE = 10;
 
-  private final static HBaseTestingUtility TEST_UTIL =
-      new HBaseTestingUtility();
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
-  private static ReplicationSink SINK;
+  protected static ReplicationSink SINK;
 
-  private static final TableName TABLE_NAME1 =
-      TableName.valueOf("table1");
-  private static final TableName TABLE_NAME2 =
-      TableName.valueOf("table2");
+  protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
+  protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
 
-  private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
-  private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+  protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+  protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
 
-  private static Table table1;
-  private static Stoppable STOPPABLE = new Stoppable() {
+  protected static Table table1;
+  protected static Stoppable STOPPABLE = new Stoppable() {
     final AtomicBoolean stop = new AtomicBoolean(false);
 
     @Override
@@ -85,10 +102,13 @@ public class TestReplicationSink {
       LOG.info("STOPPING BECAUSE: " + why);
       this.stop.set(true);
     }
-    
+
   };
 
-  private static Table table2;
+  protected static Table table2;
+  protected static String baseNamespaceDir;
+  protected static String hfileArchiveDir;
+  protected static String replicationClusterId;
 
    /**
    * @throws java.lang.Exception
@@ -98,11 +118,18 @@ public class TestReplicationSink {
     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
     TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
         HConstants.REPLICATION_ENABLE_DEFAULT);
+    TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
+      TestSourceFSConfigurationProvider.class.getCanonicalName());
+
     TEST_UTIL.startMiniCluster(3);
     SINK =
       new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
     table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
     table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+    Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
+    baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
+    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
+    replicationClusterId = "12345";
   }
 
   /**
@@ -134,7 +161,8 @@ public class TestReplicationSink {
     for(int i = 0; i < BATCH_SIZE; i++) {
       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
     }
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     Scan scan = new Scan();
     ResultScanner scanRes = table1.getScanner(scan);
     assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
@@ -151,7 +179,8 @@ public class TestReplicationSink {
     for(int i = 0; i < BATCH_SIZE/2; i++) {
       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
     }
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+      baseNamespaceDir, hfileArchiveDir);
 
     entries = new ArrayList<WALEntry>(BATCH_SIZE);
     cells = new ArrayList<Cell>();
@@ -160,7 +189,8 @@ public class TestReplicationSink {
           i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
     }
 
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     Scan scan = new Scan();
     ResultScanner scanRes = table1.getScanner(scan);
     assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
@@ -179,7 +209,8 @@ public class TestReplicationSink {
               i, KeyValue.Type.Put, cells));
     }
 
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     Scan scan = new Scan();
     ResultScanner scanRes = table2.getScanner(scan);
     for(Result res : scanRes) {
@@ -198,14 +229,16 @@ public class TestReplicationSink {
     for(int i = 0; i < 3; i++) {
       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
     }
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     entries = new ArrayList<WALEntry>(3);
     cells = new ArrayList<Cell>();
     entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
     entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
     entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
 
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
 
     Scan scan = new Scan();
     ResultScanner scanRes = table1.getScanner(scan);
@@ -228,12 +261,96 @@ public class TestReplicationSink {
     for(int i = 3; i < 5; i++) {
       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
     }
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     Get get = new Get(Bytes.toBytes(1));
     Result res = table1.get(get);
     assertEquals(0, res.size());
   }
 
+  /**
+   * Test replicateEntries with a bulk load entry for 25 HFiles
+   */
+  @Test
+  public void testReplicateEntriesForHFiles() throws Exception {
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
+    Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
+    int numRows = 10;
+
+    List<Path> p = new ArrayList<>(1);
+
+    // 1. Generate 25 hfile ranges
+    Random rng = new SecureRandom();
+    Set<Integer> numbers = new HashSet<>();
+    while (numbers.size() < 50) {
+      numbers.add(rng.nextInt(1000));
+    }
+    List<Integer> numberList = new ArrayList<>(numbers);
+    Collections.sort(numberList);
+
+    // 2. Create 25 hfiles
+    Configuration conf = TEST_UTIL.getConfiguration();
+    FileSystem fs = dir.getFileSystem(conf);
+    Iterator<Integer> numbersItr = numberList.iterator();
+    for (int i = 0; i < 25; i++) {
+      Path hfilePath = new Path(familyDir, "hfile_" + i);
+      HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
+        Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
+      p.add(hfilePath);
+    }
+
+    // 3. Create a BulkLoadDescriptor and a WALEdit
+    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+    storeFiles.put(FAM_NAME1, p);
+    WALEdit edit = null;
+    WALProtos.BulkLoadDescriptor loadDescriptor = null;
+
+    try (Connection c = ConnectionFactory.createConnection(conf);
+        RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
+      HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
+      loadDescriptor =
+          ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
+            ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 1);
+      edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
+    }
+    List<WALEntry> entries = new ArrayList<WALEntry>(1);
+
+    // 4. Create a WALEntryBuilder
+    WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
+
+    // 5. Copy the hfile to the path as it is in reality
+    for (int i = 0; i < 25; i++) {
+      String pathToHfileFromNS =
+          new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
+              .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
+              .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
+              .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
+              .append("hfile_" + i).toString();
+      String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
+
+      FileUtil.copy(fs, p.get(0), fs, new Path(dst), false, conf);
+    }
+
+    entries.add(builder.build());
+    ResultScanner scanRes = null;
+    try {
+      Scan scan = new Scan();
+      scanRes = table1.getScanner(scan);
+      // 6. Assert no existing data in table
+      assertEquals(0, scanRes.next(numRows).length);
+      // 7. Replicate the bulk loaded entry
+      SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
+        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+      scanRes = table1.getScanner(scan);
+      // 8. Assert data is replicated
+      assertEquals(numRows, scanRes.next(numRows).length);
+    } finally {
+      if (scanRes != null) {
+        scanRes.close();
+      }
+    }
+  }
+
   private WALEntry createEntry(TableName table, int row,  KeyValue.Type type, List<Cell> cells) {
     byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
     byte[] rowBytes = Bytes.toBytes(row);
@@ -256,6 +373,13 @@ public class TestReplicationSink {
         kv = new KeyValue(rowBytes, fam, null,
             now, KeyValue.Type.DeleteFamily);
     }
+    WALEntry.Builder builder = createWALEntryBuilder(table);
+    cells.add(kv);
+
+    return builder.build();
+  }
+
+  private WALEntry.Builder createWALEntryBuilder(TableName table) {
     WALEntry.Builder builder = WALEntry.newBuilder();
     builder.setAssociatedCellCount(1);
     WALKey.Builder keyBuilder = WALKey.newBuilder();
@@ -264,13 +388,10 @@ public class TestReplicationSink {
     uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
     keyBuilder.setClusterId(uuidBuilder.build());
     keyBuilder.setTableName(ByteStringer.wrap(table.getName()));
-    keyBuilder.setWriteTime(now);
+    keyBuilder.setWriteTime(System.currentTimeMillis());
     keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
     keyBuilder.setLogSequenceNumber(-1);
     builder.setKey(keyBuilder.build());
-    cells.add(kv);
-
-    return builder.build();
+    return builder;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d50522c..a208120 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -19,13 +19,17 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -51,6 +55,8 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -64,6 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -108,6 +115,8 @@ public class TestReplicationSourceManager {
 
   private static final byte[] f1 = Bytes.toBytes("f1");
 
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
   private static final TableName test =
       TableName.valueOf("test");
 
@@ -161,10 +170,10 @@ public class TestReplicationSourceManager {
     manager.addSource(slaveId);
 
     htd = new HTableDescriptor(test);
-    HColumnDescriptor col = new HColumnDescriptor("f1");
+    HColumnDescriptor col = new HColumnDescriptor(f1);
     col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
     htd.addFamily(col);
-    col = new HColumnDescriptor("f2");
+    col = new HColumnDescriptor(f2);
     col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
     htd.addFamily(col);
 
@@ -416,6 +425,63 @@ public class TestReplicationSourceManager {
     s0.abort("", null);
   }
 
+  @Test
+  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
+    // 1. Create wal key
+    WALKey logKey = new WALKey();
+    // 2. Get the bulk load wal edit event
+    WALEdit logEdit = getBulkLoadWALEdit();
+
+    // 3. Get the scopes for the key
+    Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager);
+
+    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
+    assertNull("No bulk load entries scope should be added if bulk load replication is diabled.",
+      logKey.getScopes());
+  }
+
+  @Test
+  public void testBulkLoadWALEdits() throws Exception {
+    // 1. Create wal key
+    WALKey logKey = new WALKey();
+    // 2. Get the bulk load wal edit event
+    WALEdit logEdit = getBulkLoadWALEdit();
+    // 3. Enable bulk load hfile replication
+    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
+    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+
+    // 4. Get the scopes for the key
+    Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager);
+
+    NavigableMap<byte[], Integer> scopes = logKey.getScopes();
+    // Assert family with replication scope global is present in the key scopes
+    assertTrue("This family scope is set to global, should be part of replication key scopes.",
+      scopes.containsKey(f1));
+    // Assert family with replication scope local is not present in the key scopes
+    assertFalse("This family scope is set to local, should not be part of replication key scopes",
+      scopes.containsKey(f2));
+  }
+
+  private WALEdit getBulkLoadWALEdit() {
+    // 1. Create store files for the families
+    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+    List<Path> p = new ArrayList<>(1);
+    p.add(new Path(Bytes.toString(f1)));
+    storeFiles.put(f1, p);
+
+    p = new ArrayList<>(1);
+    p.add(new Path(Bytes.toString(f2)));
+    storeFiles.put(f2, p);
+
+    // 2. Create bulk load descriptor
+    BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
+      ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1);
+
+    // 3. create bulk load wal edit event
+    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
+    return logEdit;
+  }
+
   static class DummyNodeFailoverWorker extends Thread {
     private SortedMap<String, SortedSet<String>> logZnodesMap;
     Server server;

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
new file mode 100644
index 0000000..a14c02b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class TestSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
+  @Override
+  public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+      throws IOException {
+    return sinkConf;
+  }
+}