You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2015/12/10 08:38:34 UTC

[1/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)

Repository: hbase
Updated Branches:
  refs/heads/master 9647fee3f -> 26ac60b03


http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index a8cffba..9ff4b2d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -45,8 +45,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.TableDescriptors;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
@@ -225,8 +227,16 @@ public class ReplicationSourceManager implements ReplicationListener {
    * old region server wal queues
    */
   protected void init() throws IOException, ReplicationException {
+    boolean replicationForBulkLoadDataEnabled =
+        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
     for (String id : this.replicationPeers.getPeerIds()) {
       addSource(id);
+      if (replicationForBulkLoadDataEnabled) {
+        // Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
+        // when a peer was added before replication for bulk loaded data was enabled.
+        this.replicationQueues.addPeerToHFileRefs(id);
+      }
     }
     List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
     if (currentReplicators == null || currentReplicators.size() == 0) {
@@ -733,4 +743,15 @@ public class ReplicationSourceManager implements ReplicationListener {
     }
     return stats.toString();
   }
+
+  public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+      throws ReplicationException {
+    for (ReplicationSourceInterface source : this.sources) {
+      source.addHFileRefs(tableName, family, files);
+    }
+  }
+
+  public void cleanUpHFileRefs(String peerId, List<String> files) {
+    this.replicationQueues.removeHFileRefs(peerId, files);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
new file mode 100644
index 0000000..8271115
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SourceFSConfigurationProvider.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Interface that defines how a region server in peer cluster will get source cluster file system
+ * configurations. User can configure their custom implementation implementing this interface by
+ * setting the value of their custom implementation's fully qualified class name to
+ * hbase.replication.source.fs.conf.provider property in RegionServer configuration. Default is
+ * {@link DefaultSourceFSConfigurationProvider}
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+public interface SourceFSConfigurationProvider {
+
+  /**
+   * Returns the source cluster file system configuration for the given source cluster replication
+   * ID.
+   * @param sinkConf sink cluster configuration
+   * @param replicationClusterId unique ID which identifies the source cluster
+   * @return source cluster file system configuration
+   * @throws IOException for invalid directory or for a bad disk.
+   */
+  public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+      throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
index becc9f3..3541ade 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SecureBulkLoadEndpoint.java
@@ -217,7 +217,7 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
     for(ClientProtos.BulkLoadHFileRequest.FamilyPath el : request.getFamilyPathList()) {
       familyPaths.add(new Pair(el.getFamily().toByteArray(),el.getPath()));
     }
-    
+
     Token userToken = null;
     if (userProvider.isHadoopSecurityEnabled()) {
       userToken = new Token(request.getFsToken().getIdentifier().toByteArray(), request.getFsToken()
@@ -375,6 +375,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
     public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
       Path p = new Path(srcPath);
       Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
+
+      // In case of Replication for bulk load files, hfiles are already copied in staging directory
+      if (p.equals(stageP)) {
+        LOG.debug(p.getName()
+            + " is already available in staging directory. Skipping copy or rename.");
+        return stageP.toString();
+      }
+
       if (srcFs == null) {
         srcFs = FileSystem.get(p.toUri(), conf);
       }
@@ -414,6 +422,14 @@ public class SecureBulkLoadEndpoint extends SecureBulkLoadService
       Path p = new Path(srcPath);
       Path stageP = new Path(stagingDir,
           new Path(Bytes.toString(family), p.getName()));
+
+      // In case of Replication for bulk load files, hfiles are not renamed by end point during
+      // prepare stage, so no need of rename here again
+      if (p.equals(stageP)) {
+        LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
+        return;
+      }
+
       LOG.debug("Moving " + stageP + " back to " + p);
       if(!fs.rename(stageP, p))
         throw new IOException("Failed to move HFile: " + stageP + " to " + p);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
new file mode 100644
index 0000000..87db386
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -0,0 +1,264 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.cleaner;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
+import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
+import org.apache.hadoop.hbase.replication.regionserver.Replication;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+@Category({ MasterTests.class, SmallTests.class })
+public class TestReplicationHFileCleaner {
+  private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static Server server;
+  private static ReplicationQueues rq;
+  private static ReplicationPeers rp;
+  private static final String peerId = "TestReplicationHFileCleaner";
+  private static Configuration conf = TEST_UTIL.getConfiguration();
+  static FileSystem fs = null;
+  Path root;
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniZKCluster();
+    server = new DummyServer();
+    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    Replication.decorateMasterConfiguration(conf);
+    rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
+    rp.init();
+
+    rq = ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
+    rq.init(server.getServerName().toString());
+    try {
+      fs = FileSystem.get(conf);
+    } finally {
+      if (fs != null) {
+        fs.close();
+      }
+    }
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniZKCluster();
+  }
+
+  @Before
+  public void setup() throws ReplicationException, IOException {
+    root = TEST_UTIL.getDataTestDirOnTestFS();
+    rp.addPeer(peerId, new ReplicationPeerConfig().setClusterKey(TEST_UTIL.getClusterKey()), null);
+  }
+
+  @After
+  public void cleanup() throws ReplicationException {
+    try {
+      fs.delete(root, true);
+    } catch (IOException e) {
+      LOG.warn("Failed to delete files recursively from path " + root);
+    }
+    rp.removePeer(peerId);
+  }
+
+  @Test
+  public void testIsFileDeletable() throws IOException, ReplicationException {
+    // 1. Create a file
+    Path file = new Path(root, "testIsFileDeletableWithNoHFileRefs");
+    fs.createNewFile(file);
+    // 2. Assert file is successfully created
+    assertTrue("Test file not created!", fs.exists(file));
+    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+    cleaner.setConf(conf);
+    // 3. Assert that file as is should be deletable
+    assertTrue("Cleaner should allow to delete this file as there is no hfile reference node "
+        + "for it in the queue.",
+      cleaner.isFileDeletable(fs.getFileStatus(file)));
+
+    List<String> files = new ArrayList<String>(1);
+    files.add(file.getName());
+    // 4. Add the file to hfile-refs queue
+    rq.addHFileRefs(peerId, files);
+    // 5. Assert file should not be deletable
+    assertFalse("Cleaner should not allow to delete this file as there is a hfile reference node "
+        + "for it in the queue.",
+      cleaner.isFileDeletable(fs.getFileStatus(file)));
+  }
+
+  @Test
+  public void testGetDeletableFiles() throws Exception {
+    // 1. Create two files and assert that they do not exist
+    Path notDeletablefile = new Path(root, "testGetDeletableFiles_1");
+    fs.createNewFile(notDeletablefile);
+    assertTrue("Test file not created!", fs.exists(notDeletablefile));
+    Path deletablefile = new Path(root, "testGetDeletableFiles_2");
+    fs.createNewFile(deletablefile);
+    assertTrue("Test file not created!", fs.exists(deletablefile));
+
+    List<FileStatus> files = new ArrayList<FileStatus>(2);
+    FileStatus f = new FileStatus();
+    f.setPath(deletablefile);
+    files.add(f);
+    f = new FileStatus();
+    f.setPath(notDeletablefile);
+    files.add(f);
+
+    List<String> hfiles = new ArrayList<>(1);
+    hfiles.add(notDeletablefile.getName());
+    // 2. Add one file to hfile-refs queue
+    rq.addHFileRefs(peerId, hfiles);
+
+    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+    cleaner.setConf(conf);
+    Iterator<FileStatus> deletableFilesIterator = cleaner.getDeletableFiles(files).iterator();
+    int i = 0;
+    while (deletableFilesIterator.hasNext() && i < 2) {
+      i++;
+    }
+    // 5. Assert one file should not be deletable and it is present in the list returned
+    if (i > 2) {
+      fail("File " + notDeletablefile
+          + " should not be deletable as its hfile reference node is not added.");
+    }
+    assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
+  }
+
+  /*
+   * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test
+   * will end up in a infinite loop, so it will timeout.
+   */
+  @Test(timeout = 15000)
+  public void testForDifferntHFileRefsZnodeVersion() throws Exception {
+    // 1. Create a file
+    Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
+    fs.createNewFile(file);
+    // 2. Assert file is successfully created
+    assertTrue("Test file not created!", fs.exists(file));
+    ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
+    cleaner.setConf(conf);
+
+    ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
+    //Return different znode version for each call
+    Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
+
+    Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
+    Field rqc = cleanerClass.getDeclaredField("rqc");
+    rqc.setAccessible(true);
+    rqc.set(cleaner, replicationQueuesClient);
+
+    cleaner.isFileDeletable(fs.getFileStatus(file));
+  }
+
+  static class DummyServer implements Server {
+
+    @Override
+    public Configuration getConfiguration() {
+      return TEST_UTIL.getConfiguration();
+    }
+
+    @Override
+    public ZooKeeperWatcher getZooKeeper() {
+      try {
+        return new ZooKeeperWatcher(getConfiguration(), "dummy server", this);
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+      return null;
+    }
+
+    @Override
+    public CoordinatedStateManager getCoordinatedStateManager() {
+      return null;
+    }
+
+    @Override
+    public ClusterConnection getConnection() {
+      return null;
+    }
+
+    @Override
+    public MetaTableLocator getMetaTableLocator() {
+      return null;
+    }
+
+    @Override
+    public ServerName getServerName() {
+      return ServerName.valueOf("regionserver,60020,000000");
+    }
+
+    @Override
+    public void abort(String why, Throwable e) {
+    }
+
+    @Override
+    public boolean isAborted() {
+      return false;
+    }
+
+    @Override
+    public void stop(String why) {
+    }
+
+    @Override
+    public boolean isStopped() {
+      return false;
+    }
+
+    @Override
+    public ChoreService getChoreService() {
+      return null;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index f463f76..abe484e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -19,12 +19,14 @@
 package org.apache.hadoop.hbase.replication;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
@@ -89,4 +91,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
   public String getStats() {
     return "";
   }
+
+  @Override
+  public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+      throws ReplicationException {
+    return;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 455a790..e919c24 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,15 +19,21 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -35,7 +41,9 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -48,12 +56,17 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
@@ -79,6 +92,7 @@ public class TestMasterReplication {
 
   private static final TableName tableName = TableName.valueOf("test");
   private static final byte[] famName = Bytes.toBytes("f");
+  private static final byte[] famName1 = Bytes.toBytes("f1");
   private static final byte[] row = Bytes.toBytes("row");
   private static final byte[] row1 = Bytes.toBytes("row1");
   private static final byte[] row2 = Bytes.toBytes("row2");
@@ -103,7 +117,11 @@ public class TestMasterReplication {
     baseConfiguration.setInt("hbase.regionserver.maxlogs", 10);
     baseConfiguration.setLong("hbase.master.logcleaner.ttl", 10);
     baseConfiguration.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
-        HConstants.REPLICATION_ENABLE_DEFAULT);
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    baseConfiguration.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    baseConfiguration.set("hbase.replication.source.fs.conf.provider",
+      TestSourceFSConfigurationProvider.class.getCanonicalName());
+    baseConfiguration.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
     baseConfiguration.setBoolean("dfs.support.append", true);
     baseConfiguration.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
     baseConfiguration.setStrings(
@@ -114,6 +132,9 @@ public class TestMasterReplication {
     HColumnDescriptor fam = new HColumnDescriptor(famName);
     fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
     table.addFamily(fam);
+    fam = new HColumnDescriptor(famName1);
+    fam.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    table.addFamily(fam);
     fam = new HColumnDescriptor(noRepfamName);
     table.addFamily(fam);
   }
@@ -130,14 +151,7 @@ public class TestMasterReplication {
     int numClusters = 2;
     Table[] htables = null;
     try {
-      startMiniClusters(numClusters);
-      createTableOnClusters(table);
-
-      htables = getHTablesOnClusters(tableName);
-
-      // Test the replication scenarios of 0 -> 1 -> 0
-      addPeer("1", 0, 1);
-      addPeer("1", 1, 0);
+      htables = setUpClusterTablesAndPeers(numClusters);
 
       int[] expectedCounts = new int[] { 2, 2 };
 
@@ -157,12 +171,64 @@ public class TestMasterReplication {
   }
 
   /**
-   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
-   * deleting rows to a table in each clusters and ensuring that the each of
-   * these clusters get the appropriate mutations. It also tests the grouping
-   * scenario where a cluster needs to replicate the edits originating from
-   * itself and also the edits that it received using replication from a
-   * different cluster. The scenario is explained in HBASE-9158
+   * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
+   * HFiles to a table in each cluster, checking if it's replicated.
+   */
+  @Test(timeout = 300000)
+  public void testHFileCyclicReplication() throws Exception {
+    LOG.info("testHFileCyclicReplication");
+    int numClusters = 2;
+    Table[] htables = null;
+    try {
+      htables = setUpClusterTablesAndPeers(numClusters);
+
+      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
+      // to cluster '1'.
+      byte[][][] hfileRanges =
+          new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+              new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
+      int numOfRows = 100;
+      int[] expectedCounts =
+          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+      loadAndValidateHFileReplication("testHFileCyclicReplication_01", 0, new int[] { 1 }, row,
+        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
+
+      // Load 200 rows for each hfile range in cluster '1' and validate whether its been replicated
+      // to cluster '0'.
+      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
+          new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
+      numOfRows = 200;
+      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
+          hfileRanges.length * numOfRows + expectedCounts[1] };
+
+      loadAndValidateHFileReplication("testHFileCyclicReplication_10", 1, new int[] { 0 }, row,
+        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
+
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
+  }
+
+  private Table[] setUpClusterTablesAndPeers(int numClusters) throws Exception {
+    Table[] htables;
+    startMiniClusters(numClusters);
+    createTableOnClusters(table);
+
+    htables = getHTablesOnClusters(tableName);
+    // Test the replication scenarios of 0 -> 1 -> 0
+    addPeer("1", 0, 1);
+    addPeer("1", 1, 0);
+    return htables;
+  }
+
+  /**
+   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and deleting rows to a
+   * table in each clusters and ensuring that the each of these clusters get the appropriate
+   * mutations. It also tests the grouping scenario where a cluster needs to replicate the edits
+   * originating from itself and also the edits that it received using replication from a different
+   * cluster. The scenario is explained in HBASE-9158
    */
   @Test(timeout = 300000)
   public void testCyclicReplication2() throws Exception {
@@ -213,6 +279,119 @@ public class TestMasterReplication {
   }
 
   /**
+   * It tests the multi slave hfile replication scenario involving 0 -> 1, 2. It does it by bulk
+   * loading a set of HFiles to a table in master cluster, checking if it's replicated in its peers.
+   */
+  @Test(timeout = 300000)
+  public void testHFileMultiSlaveReplication() throws Exception {
+    LOG.info("testHFileMultiSlaveReplication");
+    int numClusters = 3;
+    Table[] htables = null;
+    try {
+      startMiniClusters(numClusters);
+      createTableOnClusters(table);
+
+      // Add a slave, 0 -> 1
+      addPeer("1", 0, 1);
+
+      htables = getHTablesOnClusters(tableName);
+
+      // Load 100 rows for each hfile range in cluster '0' and validate whether its been replicated
+      // to cluster '1'.
+      byte[][][] hfileRanges =
+          new byte[][][] { new byte[][] { Bytes.toBytes("mmmm"), Bytes.toBytes("oooo") },
+              new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("rrr") }, };
+      int numOfRows = 100;
+
+      int[] expectedCounts =
+          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+      loadAndValidateHFileReplication("testHFileCyclicReplication_0", 0, new int[] { 1 }, row,
+        famName, htables, hfileRanges, numOfRows, expectedCounts, true);
+
+      // Validate data is not replicated to cluster '2'.
+      assertEquals(0, utilities[2].countRows(htables[2]));
+
+      rollWALAndWait(utilities[0], htables[0].getName(), row);
+
+      // Add one more slave, 0 -> 2
+      addPeer("2", 0, 2);
+
+      // Load 200 rows for each hfile range in cluster '0' and validate whether its been replicated
+      // to cluster '1' and '2'. Previous data should be replicated to cluster '2'.
+      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("ssss"), Bytes.toBytes("uuuu") },
+          new byte[][] { Bytes.toBytes("vvv"), Bytes.toBytes("xxx") }, };
+      numOfRows = 200;
+
+      int[] newExpectedCounts = new int[] { hfileRanges.length * numOfRows + expectedCounts[0],
+          hfileRanges.length * numOfRows + expectedCounts[1], hfileRanges.length * numOfRows };
+
+      loadAndValidateHFileReplication("testHFileCyclicReplication_1", 0, new int[] { 1, 2 }, row,
+        famName, htables, hfileRanges, numOfRows, newExpectedCounts, true);
+
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
+  }
+  
+  /**
+   * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
+   * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
+   * only one CF data to replicate.
+   */
+  @Test(timeout = 300000)
+  public void testHFileReplicationForConfiguredTableCfs() throws Exception {
+    LOG.info("testHFileReplicationForConfiguredTableCfs");
+    int numClusters = 2;
+    Table[] htables = null;
+    try {
+      startMiniClusters(numClusters);
+      createTableOnClusters(table);
+
+      htables = getHTablesOnClusters(tableName);
+      // Test the replication scenarios only 'f' is configured for table data replication not 'f1'
+      addPeer("1", 0, 1, tableName.getNameAsString() + ":" + Bytes.toString(famName));
+
+      // Load 100 rows for each hfile range in cluster '0' for table CF 'f'
+      byte[][][] hfileRanges =
+          new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+              new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("fff") }, };
+      int numOfRows = 100;
+      int[] expectedCounts =
+          new int[] { hfileRanges.length * numOfRows, hfileRanges.length * numOfRows };
+
+      loadAndValidateHFileReplication("load_f", 0, new int[] { 1 }, row, famName, htables,
+        hfileRanges, numOfRows, expectedCounts, true);
+
+      // Load 100 rows for each hfile range in cluster '0' for table CF 'f1'
+      hfileRanges = new byte[][][] { new byte[][] { Bytes.toBytes("gggg"), Bytes.toBytes("iiii") },
+          new byte[][] { Bytes.toBytes("jjj"), Bytes.toBytes("lll") }, };
+      numOfRows = 100;
+
+      int[] newExpectedCounts =
+          new int[] { hfileRanges.length * numOfRows + expectedCounts[0], expectedCounts[1] };
+
+      loadAndValidateHFileReplication("load_f1", 0, new int[] { 1 }, row, famName1, htables,
+        hfileRanges, numOfRows, newExpectedCounts, false);
+
+      // Validate data replication for CF 'f1'
+
+      // Source cluster table should contain data for the families
+      wait(0, htables[0], hfileRanges.length * numOfRows + expectedCounts[0]);
+
+      // Sleep for enough time so that the data is still not replicated for the CF which is not
+      // configured for replication
+      Thread.sleep((NB_RETRIES / 2) * SLEEP_TIME);
+      // Peer cluster should have only configured CF data
+      wait(1, htables[1], expectedCounts[1]);
+    } finally {
+      close(htables);
+      shutDownMiniClusters();
+    }
+  }
+
+  /**
    * Tests cyclic replication scenario of 0 -> 1 -> 2 -> 1.
    */
   @Test(timeout = 300000)
@@ -328,6 +507,17 @@ public class TestMasterReplication {
       close(replicationAdmin);
     }
   }
+  
+  private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
+      throws Exception {
+    ReplicationAdmin replicationAdmin = null;
+    try {
+      replicationAdmin = new ReplicationAdmin(configurations[masterClusterNumber]);
+      replicationAdmin.addPeer(id, utilities[slaveClusterNumber].getClusterKey(), tableCfs);
+    } finally {
+      close(replicationAdmin);
+    }
+  }
 
   private void disablePeer(String id, int masterClusterNumber) throws Exception {
     ReplicationAdmin replicationAdmin = null;
@@ -405,8 +595,56 @@ public class TestMasterReplication {
     wait(row, target, false);
   }
 
-  private void wait(byte[] row, Table target, boolean isDeleted)
-      throws Exception {
+  private void loadAndValidateHFileReplication(String testName, int masterNumber,
+      int[] slaveNumbers, byte[] row, byte[] fam, Table[] tables, byte[][][] hfileRanges,
+      int numOfRows, int[] expectedCounts, boolean toValidate) throws Exception {
+    HBaseTestingUtility util = utilities[masterNumber];
+
+    Path dir = util.getDataTestDirOnTestFS(testName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(fam));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      HFileTestUtil.createHFile(util.getConfiguration(), fs,
+        new Path(familyDir, "hfile_" + hfileIdx++), fam, row, from, to, numOfRows);
+    }
+
+    Table source = tables[masterNumber];
+    final TableName tableName = source.getName();
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+    String[] args = { dir.toString(), tableName.toString() };
+    loader.run(args);
+
+    if (toValidate) {
+      for (int slaveClusterNumber : slaveNumbers) {
+        wait(slaveClusterNumber, tables[slaveClusterNumber], expectedCounts[slaveClusterNumber]);
+      }
+    }
+  }
+
+  private void wait(int slaveNumber, Table target, int expectedCount)
+      throws IOException, InterruptedException {
+    int count = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for bulkloaded data replication. Current count=" + count
+            + ", expected count=" + expectedCount);
+      }
+      count = utilities[slaveNumber].countRows(target);
+      if (count != expectedCount) {
+        LOG.info("Waiting more time for bulkloaded data replication.");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  private void wait(byte[] row, Table target, boolean isDeleted) throws Exception {
     Get get = new Get(row);
     for (int i = 0; i < NB_RETRIES; i++) {
       if (i == NB_RETRIES - 1) {
@@ -430,6 +668,47 @@ public class TestMasterReplication {
     }
   }
 
+  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+      final byte[] row) throws IOException {
+    final Admin admin = utility.getHBaseAdmin();
+    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+    // find the region that corresponds to the given row.
+    HRegion region = null;
+    for (HRegion candidate : cluster.getRegions(table)) {
+      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+        region = candidate;
+        break;
+      }
+    }
+    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    // listen for successful log rolls
+    final WALActionsListener listener = new WALActionsListener.Base() {
+          @Override
+          public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+            latch.countDown();
+          }
+        };
+    region.getWAL().registerWALActionsListener(listener);
+
+    // request a roll
+    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+      region.getRegionInfo().getRegionName()));
+
+    // wait
+    try {
+      latch.await();
+    } catch (InterruptedException exception) {
+      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
+          "replication tests fail, it's probably because we should still be waiting.");
+      Thread.currentThread().interrupt();
+    }
+    region.getWAL().unregisterWALActionsListener(listener);
+  }
+
   /**
    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
    * timestamp there is otherwise no way to count them.

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
index 4823597..47d2880 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
@@ -658,7 +658,8 @@ public class TestReplicationSmallTests extends TestReplicationBase {
     HRegionInfo hri = new HRegionInfo(htable1.getName(),
       HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
     WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
-    Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit);
+    Replication.scopeWALEdits(htable1.getTableDescriptor(), new WALKey(), edit,
+      htable1.getConfiguration(), null);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index 696c130..41c3240 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.*;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.SortedMap;
 import java.util.SortedSet;
@@ -160,6 +161,62 @@ public abstract class TestReplicationStateBasic {
   }
 
   @Test
+  public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
+    rp.init();
+    rq1.init(server1);
+    rqc.init();
+
+    List<String> files1 = new ArrayList<String>(3);
+    files1.add("file_1");
+    files1.add("file_2");
+    files1.add("file_3");
+    assertNull(rqc.getReplicableHFiles(ID_ONE));
+    assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rq1.addHFileRefs(ID_ONE, files1);
+    assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
+    assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
+    List<String> files2 = new ArrayList<>(files1);
+    String removedString = files2.remove(0);
+    rq1.removeHFileRefs(ID_ONE, files2);
+    assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
+    files2 = new ArrayList<>(1);
+    files2.add(removedString);
+    rq1.removeHFileRefs(ID_ONE, files2);
+    assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
+    rp.removePeer(ID_ONE);
+  }
+
+  @Test
+  public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
+    rq1.init(server1);
+    rqc.init();
+
+    rp.init();
+    rp.addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), null);
+    rp.addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), null);
+
+    List<String> files1 = new ArrayList<String>(3);
+    files1.add("file_1");
+    files1.add("file_2");
+    files1.add("file_3");
+    rq1.addHFileRefs(ID_ONE, files1);
+    rq1.addHFileRefs(ID_TWO, files1);
+    assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
+    assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
+    assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
+
+    rp.removePeer(ID_ONE);
+    assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
+    assertNull(rqc.getReplicableHFiles(ID_ONE));
+    assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
+
+    rp.removePeer(ID_TWO);
+    assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
+    assertNull(rqc.getReplicableHFiles(ID_TWO));
+  }
+
+  @Test
   public void testReplicationPeers() throws Exception {
     rp.init();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index 4587c61..3b7402a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -64,6 +64,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
     utility = new HBaseTestingUtility();
     utility.startMiniZKCluster();
     conf = utility.getConfiguration();
+    conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
     zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
     replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
index 13545b5..b36bb9a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpTool.java
@@ -52,15 +52,15 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
   private static final TableName t1_su = TableName.valueOf("t1_syncup");
   private static final TableName t2_su = TableName.valueOf("t2_syncup");
 
-  private static final byte[] famName = Bytes.toBytes("cf1");
+  protected static final byte[] famName = Bytes.toBytes("cf1");
   private static final byte[] qualName = Bytes.toBytes("q1");
 
-  private static final byte[] noRepfamName = Bytes.toBytes("norep");
+  protected static final byte[] noRepfamName = Bytes.toBytes("norep");
 
   private HTableDescriptor t1_syncupSource, t1_syncupTarget;
   private HTableDescriptor t2_syncupSource, t2_syncupTarget;
 
-  private Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
+  protected Table ht1Source, ht2Source, ht1TargetAtPeer1, ht2TargetAtPeer1;
 
   @Before
   public void setUp() throws Exception {
@@ -179,7 +179,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
 
   }
 
-  private void setupReplication() throws Exception {
+  protected void setupReplication() throws Exception {
     ReplicationAdmin admin1 = new ReplicationAdmin(conf1);
     ReplicationAdmin admin2 = new ReplicationAdmin(conf2);
 
@@ -418,7 +418,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
     }
   }
 
-  private void syncUp(HBaseTestingUtility ut) throws Exception {
+  protected void syncUp(HBaseTestingUtility ut) throws Exception {
     ReplicationSyncUp.setConfigure(ut.getConfiguration());
     String[] arguments = new String[] { null };
     new ReplicationSyncUp().run(arguments);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
new file mode 100644
index 0000000..f54c632
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, LargeTests.class })
+public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplicationSyncUpTool {
+
+  private static final Log LOG = LogFactory
+      .getLog(TestReplicationSyncUpToolWithBulkLoadedData.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    conf1.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+    conf1.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
+    conf1.set("hbase.replication.source.fs.conf.provider",
+      TestSourceFSConfigurationProvider.class.getCanonicalName());
+    String classes = conf1.get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+    if (!classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint")) {
+      classes = classes + ",org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint";
+      conf1.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, classes);
+    }
+
+    TestReplicationBase.setUpBeforeClass();
+  }
+
+  @Override
+  public void testSyncUpTool() throws Exception {
+    /**
+     * Set up Replication: on Master and one Slave Table: t1_syncup and t2_syncup columnfamily:
+     * 'cf1' : replicated 'norep': not replicated
+     */
+    setupReplication();
+
+    /**
+     * Prepare 16 random hfile ranges required for creating hfiles
+     */
+    Iterator<String> randomHFileRangeListIterator = null;
+    Set<String> randomHFileRanges = new HashSet<String>(16);
+    for (int i = 0; i < 16; i++) {
+      randomHFileRanges.add(UUID.randomUUID().toString());
+    }
+    List<String> randomHFileRangeList = new ArrayList<>(randomHFileRanges);
+    Collections.sort(randomHFileRangeList);
+    randomHFileRangeListIterator = randomHFileRangeList.iterator();
+
+    /**
+     * at Master: t1_syncup: Load 100 rows into cf1, and 3 rows into norep t2_syncup: Load 200 rows
+     * into cf1, and 3 rows into norep verify correctly replicated to slave
+     */
+    loadAndReplicateHFiles(true, randomHFileRangeListIterator);
+
+    /**
+     * Verify hfile load works step 1: stop hbase on Slave step 2: at Master: t1_syncup: Load
+     * another 100 rows into cf1 and 3 rows into norep t2_syncup: Load another 200 rows into cf1 and
+     * 3 rows into norep step 3: stop hbase on master, restart hbase on Slave step 4: verify Slave
+     * still has the rows before load t1_syncup: 100 rows from cf1 t2_syncup: 200 rows from cf1 step
+     * 5: run syncup tool on Master step 6: verify that hfiles show up on Slave and 'norep' does not
+     * t1_syncup: 200 rows from cf1 t2_syncup: 400 rows from cf1 verify correctly replicated to
+     * Slave
+     */
+    mimicSyncUpAfterBulkLoad(randomHFileRangeListIterator);
+
+  }
+
+  private void mimicSyncUpAfterBulkLoad(Iterator<String> randomHFileRangeListIterator)
+      throws Exception {
+    LOG.debug("mimicSyncUpAfterBulkLoad");
+    utility2.shutdownMiniHBaseCluster();
+
+    loadAndReplicateHFiles(false, randomHFileRangeListIterator);
+
+    int rowCount_ht1Source = utility1.countRows(ht1Source);
+    assertEquals("t1_syncup has 206 rows on source, after bulk load of another 103 hfiles", 206,
+      rowCount_ht1Source);
+
+    int rowCount_ht2Source = utility1.countRows(ht2Source);
+    assertEquals("t2_syncup has 406 rows on source, after bulk load of another 203 hfiles", 406,
+      rowCount_ht2Source);
+
+    utility1.shutdownMiniHBaseCluster();
+    utility2.restartHBaseCluster(1);
+
+    Thread.sleep(SLEEP_TIME);
+
+    // Before sync up
+    int rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+    int rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+    assertEquals("@Peer1 t1_syncup should still have 100 rows", 100, rowCount_ht1TargetAtPeer1);
+    assertEquals("@Peer1 t2_syncup should still have 200 rows", 200, rowCount_ht2TargetAtPeer1);
+
+    // Run sync up tool
+    syncUp(utility1);
+
+    // After syun up
+    for (int i = 0; i < NB_RETRIES; i++) {
+      syncUp(utility1);
+      rowCount_ht1TargetAtPeer1 = utility2.countRows(ht1TargetAtPeer1);
+      rowCount_ht2TargetAtPeer1 = utility2.countRows(ht2TargetAtPeer1);
+      if (i == NB_RETRIES - 1) {
+        if (rowCount_ht1TargetAtPeer1 != 200 || rowCount_ht2TargetAtPeer1 != 400) {
+          // syncUP still failed. Let's look at the source in case anything wrong there
+          utility1.restartHBaseCluster(1);
+          rowCount_ht1Source = utility1.countRows(ht1Source);
+          LOG.debug("t1_syncup should have 206 rows at source, and it is " + rowCount_ht1Source);
+          rowCount_ht2Source = utility1.countRows(ht2Source);
+          LOG.debug("t2_syncup should have 406 rows at source, and it is " + rowCount_ht2Source);
+        }
+        assertEquals("@Peer1 t1_syncup should be sync up and have 200 rows", 200,
+          rowCount_ht1TargetAtPeer1);
+        assertEquals("@Peer1 t2_syncup should be sync up and have 400 rows", 400,
+          rowCount_ht2TargetAtPeer1);
+      }
+      if (rowCount_ht1TargetAtPeer1 == 200 && rowCount_ht2TargetAtPeer1 == 400) {
+        LOG.info("SyncUpAfterBulkLoad succeeded at retry = " + i);
+        break;
+      } else {
+        LOG.debug("SyncUpAfterBulkLoad failed at retry = " + i + ", with rowCount_ht1TargetPeer1 ="
+            + rowCount_ht1TargetAtPeer1 + " and rowCount_ht2TargetAtPeer1 ="
+            + rowCount_ht2TargetAtPeer1);
+      }
+      Thread.sleep(SLEEP_TIME);
+    }
+  }
+
+  private void loadAndReplicateHFiles(boolean verifyReplicationOnSlave,
+      Iterator<String> randomHFileRangeListIterator) throws Exception {
+    LOG.debug("loadAndReplicateHFiles");
+
+    // Load 100 + 3 hfiles to t1_syncup.
+    byte[][][] hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht1Source, hfileRanges,
+      100);
+
+    hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht1Source,
+      hfileRanges, 3);
+
+    // Load 200 + 3 hfiles to t2_syncup.
+    hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, famName, ht2Source, hfileRanges,
+      200);
+
+    hfileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes(randomHFileRangeListIterator.next()),
+            Bytes.toBytes(randomHFileRangeListIterator.next()) } };
+    loadAndValidateHFileReplication("HFileReplication_1", row, noRepfamName, ht2Source,
+      hfileRanges, 3);
+
+    if (verifyReplicationOnSlave) {
+      // ensure replication completed
+      wait(ht1TargetAtPeer1, utility1.countRows(ht1Source) - 3,
+        "t1_syncup has 103 rows on source, and 100 on slave1");
+
+      wait(ht2TargetAtPeer1, utility1.countRows(ht2Source) - 3,
+        "t2_syncup has 203 rows on source, and 200 on slave1");
+    }
+  }
+
+  private void loadAndValidateHFileReplication(String testName, byte[] row, byte[] fam,
+      Table source, byte[][][] hfileRanges, int numOfRows) throws Exception {
+    Path dir = utility1.getDataTestDirOnTestFS(testName);
+    FileSystem fs = utility1.getTestFileSystem();
+    dir = dir.makeQualified(fs);
+    Path familyDir = new Path(dir, Bytes.toString(fam));
+
+    int hfileIdx = 0;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      HFileTestUtil.createHFile(utility1.getConfiguration(), fs, new Path(familyDir, "hfile_"
+          + hfileIdx++), fam, row, from, to, numOfRows);
+    }
+
+    final TableName tableName = source.getName();
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
+    String[] args = { dir.toString(), tableName.toString() };
+    loader.run(args);
+  }
+
+  private void wait(Table target, int expectedCount, String msg) throws IOException,
+      InterruptedException {
+    for (int i = 0; i < NB_RETRIES; i++) {
+      int rowCount_ht2TargetAtPeer1 = utility2.countRows(target);
+      if (i == NB_RETRIES - 1) {
+        assertEquals(msg, expectedCount, rowCount_ht2TargetAtPeer1);
+      }
+      if (expectedCount == rowCount_ht2TargetAtPeer1) {
+        break;
+      }
+      Thread.sleep(SLEEP_TIME);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index b87e7ef..f08d2bf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -21,32 +21,52 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.UUID;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -58,21 +78,18 @@ public class TestReplicationSink {
   private static final Log LOG = LogFactory.getLog(TestReplicationSink.class);
   private static final int BATCH_SIZE = 10;
 
-  private final static HBaseTestingUtility TEST_UTIL =
-      new HBaseTestingUtility();
+  protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
-  private static ReplicationSink SINK;
+  protected static ReplicationSink SINK;
 
-  private static final TableName TABLE_NAME1 =
-      TableName.valueOf("table1");
-  private static final TableName TABLE_NAME2 =
-      TableName.valueOf("table2");
+  protected static final TableName TABLE_NAME1 = TableName.valueOf("table1");
+  protected static final TableName TABLE_NAME2 = TableName.valueOf("table2");
 
-  private static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
-  private static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
+  protected static final byte[] FAM_NAME1 = Bytes.toBytes("info1");
+  protected static final byte[] FAM_NAME2 = Bytes.toBytes("info2");
 
-  private static Table table1;
-  private static Stoppable STOPPABLE = new Stoppable() {
+  protected static Table table1;
+  protected static Stoppable STOPPABLE = new Stoppable() {
     final AtomicBoolean stop = new AtomicBoolean(false);
 
     @Override
@@ -85,10 +102,13 @@ public class TestReplicationSink {
       LOG.info("STOPPING BECAUSE: " + why);
       this.stop.set(true);
     }
-    
+
   };
 
-  private static Table table2;
+  protected static Table table2;
+  protected static String baseNamespaceDir;
+  protected static String hfileArchiveDir;
+  protected static String replicationClusterId;
 
    /**
    * @throws java.lang.Exception
@@ -98,11 +118,18 @@ public class TestReplicationSink {
     TEST_UTIL.getConfiguration().setBoolean("dfs.support.append", true);
     TEST_UTIL.getConfiguration().setBoolean(HConstants.REPLICATION_ENABLE_KEY,
         HConstants.REPLICATION_ENABLE_DEFAULT);
+    TEST_UTIL.getConfiguration().set("hbase.replication.source.fs.conf.provider",
+      TestSourceFSConfigurationProvider.class.getCanonicalName());
+
     TEST_UTIL.startMiniCluster(3);
     SINK =
       new ReplicationSink(new Configuration(TEST_UTIL.getConfiguration()), STOPPABLE);
     table1 = TEST_UTIL.createTable(TABLE_NAME1, FAM_NAME1);
     table2 = TEST_UTIL.createTable(TABLE_NAME2, FAM_NAME2);
+    Path rootDir = FSUtils.getRootDir(TEST_UTIL.getConfiguration());
+    baseNamespaceDir = new Path(rootDir, new Path(HConstants.BASE_NAMESPACE_DIR)).toString();
+    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY)).toString();
+    replicationClusterId = "12345";
   }
 
   /**
@@ -134,7 +161,8 @@ public class TestReplicationSink {
     for(int i = 0; i < BATCH_SIZE; i++) {
       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
     }
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     Scan scan = new Scan();
     ResultScanner scanRes = table1.getScanner(scan);
     assertEquals(BATCH_SIZE, scanRes.next(BATCH_SIZE).length);
@@ -151,7 +179,8 @@ public class TestReplicationSink {
     for(int i = 0; i < BATCH_SIZE/2; i++) {
       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
     }
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells), replicationClusterId,
+      baseNamespaceDir, hfileArchiveDir);
 
     entries = new ArrayList<WALEntry>(BATCH_SIZE);
     cells = new ArrayList<Cell>();
@@ -160,7 +189,8 @@ public class TestReplicationSink {
           i % 2 != 0 ? KeyValue.Type.Put: KeyValue.Type.DeleteColumn, cells));
     }
 
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     Scan scan = new Scan();
     ResultScanner scanRes = table1.getScanner(scan);
     assertEquals(BATCH_SIZE/2, scanRes.next(BATCH_SIZE).length);
@@ -179,7 +209,8 @@ public class TestReplicationSink {
               i, KeyValue.Type.Put, cells));
     }
 
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     Scan scan = new Scan();
     ResultScanner scanRes = table2.getScanner(scan);
     for(Result res : scanRes) {
@@ -198,14 +229,16 @@ public class TestReplicationSink {
     for(int i = 0; i < 3; i++) {
       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
     }
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     entries = new ArrayList<WALEntry>(3);
     cells = new ArrayList<Cell>();
     entries.add(createEntry(TABLE_NAME1, 0, KeyValue.Type.DeleteColumn, cells));
     entries.add(createEntry(TABLE_NAME1, 1, KeyValue.Type.DeleteFamily, cells));
     entries.add(createEntry(TABLE_NAME1, 2, KeyValue.Type.DeleteColumn, cells));
 
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
 
     Scan scan = new Scan();
     ResultScanner scanRes = table1.getScanner(scan);
@@ -228,12 +261,96 @@ public class TestReplicationSink {
     for(int i = 3; i < 5; i++) {
       entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
     }
-    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()));
+    SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+      replicationClusterId, baseNamespaceDir, hfileArchiveDir);
     Get get = new Get(Bytes.toBytes(1));
     Result res = table1.get(get);
     assertEquals(0, res.size());
   }
 
+  /**
+   * Test replicateEntries with a bulk load entry for 25 HFiles
+   */
+  @Test
+  public void testReplicateEntriesForHFiles() throws Exception {
+    Path dir = TEST_UTIL.getDataTestDirOnTestFS("testReplicateEntries");
+    Path familyDir = new Path(dir, Bytes.toString(FAM_NAME1));
+    int numRows = 10;
+
+    List<Path> p = new ArrayList<>(1);
+
+    // 1. Generate 25 hfile ranges
+    Random rng = new SecureRandom();
+    Set<Integer> numbers = new HashSet<>();
+    while (numbers.size() < 50) {
+      numbers.add(rng.nextInt(1000));
+    }
+    List<Integer> numberList = new ArrayList<>(numbers);
+    Collections.sort(numberList);
+
+    // 2. Create 25 hfiles
+    Configuration conf = TEST_UTIL.getConfiguration();
+    FileSystem fs = dir.getFileSystem(conf);
+    Iterator<Integer> numbersItr = numberList.iterator();
+    for (int i = 0; i < 25; i++) {
+      Path hfilePath = new Path(familyDir, "hfile_" + i);
+      HFileTestUtil.createHFile(conf, fs, hfilePath, FAM_NAME1, FAM_NAME1,
+        Bytes.toBytes(numbersItr.next()), Bytes.toBytes(numbersItr.next()), numRows);
+      p.add(hfilePath);
+    }
+
+    // 3. Create a BulkLoadDescriptor and a WALEdit
+    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+    storeFiles.put(FAM_NAME1, p);
+    WALEdit edit = null;
+    WALProtos.BulkLoadDescriptor loadDescriptor = null;
+
+    try (Connection c = ConnectionFactory.createConnection(conf);
+        RegionLocator l = c.getRegionLocator(TABLE_NAME1)) {
+      HRegionInfo regionInfo = l.getAllRegionLocations().get(0).getRegionInfo();
+      loadDescriptor =
+          ProtobufUtil.toBulkLoadDescriptor(TABLE_NAME1,
+            ByteStringer.wrap(regionInfo.getEncodedNameAsBytes()), storeFiles, 1);
+      edit = WALEdit.createBulkLoadEvent(regionInfo, loadDescriptor);
+    }
+    List<WALEntry> entries = new ArrayList<WALEntry>(1);
+
+    // 4. Create a WALEntryBuilder
+    WALEntry.Builder builder = createWALEntryBuilder(TABLE_NAME1);
+
+    // 5. Copy the hfile to the path as it is in reality
+    for (int i = 0; i < 25; i++) {
+      String pathToHfileFromNS =
+          new StringBuilder(100).append(TABLE_NAME1.getNamespaceAsString()).append(Path.SEPARATOR)
+              .append(Bytes.toString(TABLE_NAME1.getName())).append(Path.SEPARATOR)
+              .append(Bytes.toString(loadDescriptor.getEncodedRegionName().toByteArray()))
+              .append(Path.SEPARATOR).append(Bytes.toString(FAM_NAME1)).append(Path.SEPARATOR)
+              .append("hfile_" + i).toString();
+      String dst = baseNamespaceDir + Path.SEPARATOR + pathToHfileFromNS;
+
+      FileUtil.copy(fs, p.get(0), fs, new Path(dst), false, conf);
+    }
+
+    entries.add(builder.build());
+    ResultScanner scanRes = null;
+    try {
+      Scan scan = new Scan();
+      scanRes = table1.getScanner(scan);
+      // 6. Assert no existing data in table
+      assertEquals(0, scanRes.next(numRows).length);
+      // 7. Replicate the bulk loaded entry
+      SINK.replicateEntries(entries, CellUtil.createCellScanner(edit.getCells().iterator()),
+        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+      scanRes = table1.getScanner(scan);
+      // 8. Assert data is replicated
+      assertEquals(numRows, scanRes.next(numRows).length);
+    } finally {
+      if (scanRes != null) {
+        scanRes.close();
+      }
+    }
+  }
+
   private WALEntry createEntry(TableName table, int row,  KeyValue.Type type, List<Cell> cells) {
     byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
     byte[] rowBytes = Bytes.toBytes(row);
@@ -256,6 +373,13 @@ public class TestReplicationSink {
         kv = new KeyValue(rowBytes, fam, null,
             now, KeyValue.Type.DeleteFamily);
     }
+    WALEntry.Builder builder = createWALEntryBuilder(table);
+    cells.add(kv);
+
+    return builder.build();
+  }
+
+  private WALEntry.Builder createWALEntryBuilder(TableName table) {
     WALEntry.Builder builder = WALEntry.newBuilder();
     builder.setAssociatedCellCount(1);
     WALKey.Builder keyBuilder = WALKey.newBuilder();
@@ -264,13 +388,10 @@ public class TestReplicationSink {
     uuidBuilder.setMostSigBits(HConstants.DEFAULT_CLUSTER_ID.getMostSignificantBits());
     keyBuilder.setClusterId(uuidBuilder.build());
     keyBuilder.setTableName(ByteStringer.wrap(table.getName()));
-    keyBuilder.setWriteTime(now);
+    keyBuilder.setWriteTime(System.currentTimeMillis());
     keyBuilder.setEncodedRegionName(ByteStringer.wrap(HConstants.EMPTY_BYTE_ARRAY));
     keyBuilder.setLogSequenceNumber(-1);
     builder.setKey(keyBuilder.build());
-    cells.add(kv);
-
-    return builder.build();
+    return builder;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index d50522c..a208120 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -19,13 +19,17 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.net.URLEncoder;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.NavigableMap;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
@@ -51,6 +55,8 @@ import org.apache.hadoop.hbase.Server;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -64,6 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL;
@@ -108,6 +115,8 @@ public class TestReplicationSourceManager {
 
   private static final byte[] f1 = Bytes.toBytes("f1");
 
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
   private static final TableName test =
       TableName.valueOf("test");
 
@@ -161,10 +170,10 @@ public class TestReplicationSourceManager {
     manager.addSource(slaveId);
 
     htd = new HTableDescriptor(test);
-    HColumnDescriptor col = new HColumnDescriptor("f1");
+    HColumnDescriptor col = new HColumnDescriptor(f1);
     col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
     htd.addFamily(col);
-    col = new HColumnDescriptor("f2");
+    col = new HColumnDescriptor(f2);
     col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
     htd.addFamily(col);
 
@@ -416,6 +425,63 @@ public class TestReplicationSourceManager {
     s0.abort("", null);
   }
 
+  @Test
+  public void testBulkLoadWALEditsWithoutBulkLoadReplicationEnabled() throws Exception {
+    // 1. Create wal key
+    WALKey logKey = new WALKey();
+    // 2. Get the bulk load wal edit event
+    WALEdit logEdit = getBulkLoadWALEdit();
+
+    // 3. Get the scopes for the key
+    Replication.scopeWALEdits(htd, logKey, logEdit, conf, manager);
+
+    // 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
+    assertNull("No bulk load entries scope should be added if bulk load replication is diabled.",
+      logKey.getScopes());
+  }
+
+  @Test
+  public void testBulkLoadWALEdits() throws Exception {
+    // 1. Create wal key
+    WALKey logKey = new WALKey();
+    // 2. Get the bulk load wal edit event
+    WALEdit logEdit = getBulkLoadWALEdit();
+    // 3. Enable bulk load hfile replication
+    Configuration bulkLoadConf = HBaseConfiguration.create(conf);
+    bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+
+    // 4. Get the scopes for the key
+    Replication.scopeWALEdits(htd, logKey, logEdit, bulkLoadConf, manager);
+
+    NavigableMap<byte[], Integer> scopes = logKey.getScopes();
+    // Assert family with replication scope global is present in the key scopes
+    assertTrue("This family scope is set to global, should be part of replication key scopes.",
+      scopes.containsKey(f1));
+    // Assert family with replication scope local is not present in the key scopes
+    assertFalse("This family scope is set to local, should not be part of replication key scopes",
+      scopes.containsKey(f2));
+  }
+
+  private WALEdit getBulkLoadWALEdit() {
+    // 1. Create store files for the families
+    Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
+    List<Path> p = new ArrayList<>(1);
+    p.add(new Path(Bytes.toString(f1)));
+    storeFiles.put(f1, p);
+
+    p = new ArrayList<>(1);
+    p.add(new Path(Bytes.toString(f2)));
+    storeFiles.put(f2, p);
+
+    // 2. Create bulk load descriptor
+    BulkLoadDescriptor desc = ProtobufUtil.toBulkLoadDescriptor(hri.getTable(),
+      ByteStringer.wrap(hri.getEncodedNameAsBytes()), storeFiles, 1);
+
+    // 3. create bulk load wal edit event
+    WALEdit logEdit = WALEdit.createBulkLoadEvent(hri, desc);
+    return logEdit;
+  }
+
   static class DummyNodeFailoverWorker extends Thread {
     private SortedMap<String, SortedSet<String>> logZnodesMap;
     Server server;

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
new file mode 100644
index 0000000..a14c02b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSourceFSConfigurationProvider.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class TestSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
+  @Override
+  public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+      throws IOException {
+    return sinkConf;
+  }
+}


[3/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)

Posted by ra...@apache.org.
HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26ac60b0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26ac60b0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26ac60b0

Branch: refs/heads/master
Commit: 26ac60b03f80c9215103a02db783341e67037753
Parents: 9647fee
Author: ramkrishna <ra...@gmail.com>
Authored: Thu Dec 10 13:07:46 2015 +0530
Committer: ramkrishna <ra...@gmail.com>
Committed: Thu Dec 10 13:07:46 2015 +0530

----------------------------------------------------------------------
 .../hbase/replication/ReplicationPeers.java     |   2 +-
 .../replication/ReplicationPeersZKImpl.java     |  26 +-
 .../hbase/replication/ReplicationQueues.java    |  25 +-
 .../replication/ReplicationQueuesClient.java    |  25 +-
 .../ReplicationQueuesClientZKImpl.java          |  37 ++
 .../replication/ReplicationQueuesZKImpl.java    |  70 +++
 .../replication/ReplicationStateZKBase.java     |  14 +-
 .../apache/hadoop/hbase/zookeeper/ZKUtil.java   |  24 +-
 .../org/apache/hadoop/hbase/HConstants.java     |  16 +-
 .../MetricsReplicationSinkSource.java           |   2 +
 .../MetricsReplicationSourceSource.java         |   6 +
 .../MetricsReplicationGlobalSourceSource.java   |  21 +
 .../MetricsReplicationSinkSourceImpl.java       |   7 +
 .../MetricsReplicationSourceSourceImpl.java     |  28 +
 .../hbase/protobuf/generated/AdminProtos.java   | 602 +++++++++++++++++--
 hbase-protocol/src/main/protobuf/Admin.proto    |   3 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 152 +++--
 .../hbase/protobuf/ReplicationProtbufUtil.java  |  46 +-
 .../hbase/regionserver/RSRpcServices.java       |   4 +-
 .../regionserver/ReplicationSinkService.java    |   8 +-
 .../regionserver/wal/WALActionsListener.java    |  19 +-
 .../hbase/replication/ScopeWALEntryFilter.java  |  72 ++-
 .../replication/TableCfWALEntryFilter.java      |  76 ++-
 .../master/ReplicationHFileCleaner.java         | 193 ++++++
 .../DefaultSourceFSConfigurationProvider.java   |  78 +++
 .../HBaseInterClusterReplicationEndpoint.java   |  32 +-
 .../regionserver/HFileReplicator.java           | 393 ++++++++++++
 .../replication/regionserver/MetricsSink.java   |  13 +-
 .../replication/regionserver/MetricsSource.java |  31 +
 .../RegionReplicaReplicationEndpoint.java       |   4 +-
 .../replication/regionserver/Replication.java   | 133 +++-
 .../regionserver/ReplicationSink.java           | 200 +++++-
 .../regionserver/ReplicationSource.java         |  92 ++-
 .../ReplicationSourceInterface.java             |  13 +
 .../regionserver/ReplicationSourceManager.java  |  21 +
 .../SourceFSConfigurationProvider.java          |  40 ++
 .../security/access/SecureBulkLoadEndpoint.java |  18 +-
 .../cleaner/TestReplicationHFileCleaner.java    | 264 ++++++++
 .../replication/ReplicationSourceDummy.java     |   8 +
 .../replication/TestMasterReplication.java      | 313 +++++++++-
 .../replication/TestReplicationSmallTests.java  |   3 +-
 .../replication/TestReplicationStateBasic.java  |  57 ++
 .../replication/TestReplicationStateZKImpl.java |   1 +
 .../replication/TestReplicationSyncUpTool.java  |  10 +-
 ...ReplicationSyncUpToolWithBulkLoadedData.java | 235 ++++++++
 .../regionserver/TestReplicationSink.java       | 179 +++++-
 .../TestReplicationSourceManager.java           |  70 ++-
 .../TestSourceFSConfigurationProvider.java      |  25 +
 48 files changed, 3444 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
index 8e80e06..8bf21d5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
@@ -50,7 +50,7 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @param peerConfig configuration for the replication slave cluster
    * @param tableCFs the table and column-family list which will be replicated for this peer or null
-   * for all table and column families
+   *          for all table and column families
    */
   void addPeer(String peerId, ReplicationPeerConfig peerConfig, String tableCFs)
       throws ReplicationException;

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
index 63f9ac3..fd10b66 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NoNodeException;
 
 import com.google.protobuf.ByteString;
 
@@ -120,8 +121,21 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
       }
 
       checkQueuesDeleted(id);
-      
+
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+
+      // If only bulk load hfile replication is enabled then add peerId node to hfile-refs node
+      if (replicationForBulkLoadEnabled) {
+        try {
+          String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
+          LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+          ZKUtil.createWithParents(this.zookeeper, peerId);
+        } catch (KeeperException e) {
+          throw new ReplicationException("Failed to add peer with id=" + id
+              + ", node under hfile references node.", e);
+        }
+      }
+
       List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
       ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(this.peersZNode, id),
         toByteArray(peerConfig));
@@ -151,6 +165,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
             + " because that id does not exist.");
       }
       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+      // Delete peerId node from hfile-refs node irrespective of whether bulk loaded hfile
+      // replication is enabled or not
+
+      String peerId = ZKUtil.joinZNode(this.hfileRefsZNode, id);
+      try {
+        LOG.info("Removing peer " + peerId + " from hfile reference queue.");
+        ZKUtil.deleteNodeRecursively(this.zookeeper, peerId);
+      } catch (NoNodeException e) {
+        LOG.info("Did not find node " + peerId + " to delete.", e);
+      }
     } catch (KeeperException e) {
       throw new ReplicationException("Could not remove peer with id=" + id, e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index 3dbbc33..0d47a88 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -26,7 +26,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 
 /**
  * This provides an interface for maintaining a region server's replication queues. These queues
- * keep track of the WALs that still need to be replicated to remote clusters.
+ * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
+ * that still need to be replicated to remote clusters.
  */
 @InterfaceAudience.Private
 public interface ReplicationQueues {
@@ -113,4 +114,26 @@ public interface ReplicationQueues {
    * @return if this is this rs's znode
    */
   boolean isThisOurZnode(String znode);
+
+  /**
+   * Add a peer to hfile reference queue if peer does not exist.
+   * @param peerId peer cluster id to be added
+   * @throws ReplicationException if fails to add a peer id to hfile reference queue
+   */
+  void addPeerToHFileRefs(String peerId) throws ReplicationException;
+
+  /**
+   * Add new hfile references to the queue.
+   * @param peerId peer cluster id to which the hfiles need to be replicated
+   * @param files list of hfile references to be added
+   * @throws ReplicationException if fails to add a hfile reference
+   */
+  void addHFileRefs(String peerId, List<String> files) throws ReplicationException;
+
+  /**
+   * Remove hfile references from the queue.
+   * @param peerId peer cluster id from which this hfile references needs to be removed
+   * @param files list of hfile references to be removed
+   */
+  void removeHFileRefs(String peerId, List<String> files);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
index 5b3e541..7fa3bbb 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
@@ -25,7 +25,8 @@ import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
- * keep track of the WALs that still need to be replicated to remote clusters.
+ * keep track of the sources(WALs/HFile references) that still need to be replicated to remote
+ * clusters.
  */
 @InterfaceAudience.Private
 public interface ReplicationQueuesClient {
@@ -65,4 +66,26 @@ public interface ReplicationQueuesClient {
    * @return cversion of replication rs node
    */
   int getQueuesZNodeCversion() throws KeeperException;
+
+  /**
+   * Get the change version number of replication hfile references node. This can be used as
+   * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
+   * @return change version number of hfile references node
+   */
+  int getHFileRefsNodeChangeVersion() throws KeeperException;
+
+  /**
+   * Get list of all peers from hfile reference queue.
+   * @return a list of peer ids
+   * @throws KeeperException zookeeper exception
+   */
+  List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
+
+  /**
+   * Get a list of all hfile references in the given peer.
+   * @param peerId a String that identifies the peer
+   * @return a list of hfile references, null if not found any
+   * @throws KeeperException zookeeper exception
+   */
+  List<String> getReplicableHFiles(String peerId) throws KeeperException;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
index e1a6a49..cc407e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
@@ -84,4 +84,41 @@ public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implem
       throw e;
     }
   }
+
+  @Override
+  public int getHFileRefsNodeChangeVersion() throws KeeperException {
+    Stat stat = new Stat();
+    try {
+      ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get stat of replication hfile references node.", e);
+      throw e;
+    }
+    return stat.getCversion();
+  }
+
+  @Override
+  public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
+      throw e;
+    }
+    return result;
+  }
+
+  @Override
+  public List<String> getReplicableHFiles(String peerId) throws KeeperException {
+    String znode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    List<String> result = null;
+    try {
+      result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
+      throw e;
+    }
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 97763e2..43dd412 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -84,6 +84,15 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
     } catch (KeeperException e) {
       throw new ReplicationException("Could not initialize replication queues.", e);
     }
+    // If only bulk load hfile replication is enabled then create the hfile-refs znode
+    if (replicationForBulkLoadEnabled) {
+      try {
+        ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
+      } catch (KeeperException e) {
+        throw new ReplicationException("Could not initialize hfile references replication queue.",
+            e);
+      }
+    }
   }
 
   @Override
@@ -431,4 +440,65 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
         ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
     return ProtobufUtil.prependPBMagic(bytes);
   }
+
+  @Override
+  public void addHFileRefs(String peerId, List<String> files) throws ReplicationException {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    boolean debugEnabled = LOG.isDebugEnabled();
+    if (debugEnabled) {
+      LOG.debug("Adding hfile references " + files + " in queue " + peerZnode);
+    }
+    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+    int size = files.size();
+    for (int i = 0; i < size; i++) {
+      listOfOps.add(ZKUtilOp.createAndFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i)),
+        HConstants.EMPTY_BYTE_ARRAY));
+    }
+    if (debugEnabled) {
+      LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
+          + " is " + listOfOps.size());
+    }
+    try {
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
+    }
+  }
+
+  @Override
+  public void removeHFileRefs(String peerId, List<String> files) {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    boolean debugEnabled = LOG.isDebugEnabled();
+    if (debugEnabled) {
+      LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
+    }
+    List<ZKUtilOp> listOfOps = new ArrayList<ZKUtil.ZKUtilOp>();
+    int size = files.size();
+    for (int i = 0; i < size; i++) {
+      listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZKUtil.joinZNode(peerZnode, files.get(i))));
+    }
+    if (debugEnabled) {
+      LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
+          + " is " + listOfOps.size());
+    }
+    try {
+      ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
+    } catch (KeeperException e) {
+      LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
+    }
+  }
+
+  @Override
+  public void addPeerToHFileRefs(String peerId) throws ReplicationException {
+    String peerZnode = ZKUtil.joinZNode(this.hfileRefsZNode, peerId);
+    try {
+      if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
+        LOG.info("Adding peer " + peerId + " to hfile reference queue.");
+        ZKUtil.createWithParents(this.zookeeper, peerZnode);
+      }
+    } catch (KeeperException e) {
+      throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
+          e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
index 4fbac0f..762167f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
@@ -20,9 +20,10 @@ package org.apache.hadoop.hbase.replication;
 
 import java.util.List;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.zookeeper.ZKConfig;
@@ -48,32 +49,43 @@ public abstract class ReplicationStateZKBase {
   protected final String peersZNode;
   /** The name of the znode that contains all replication queues */
   protected final String queuesZNode;
+  /** The name of the znode that contains queues of hfile references to be replicated */
+  protected final String hfileRefsZNode;
   /** The cluster key of the local cluster */
   protected final String ourClusterKey;
   protected final ZooKeeperWatcher zookeeper;
   protected final Configuration conf;
   protected final Abortable abortable;
+  protected final boolean replicationForBulkLoadEnabled;
 
   // Public for testing
   public static final byte[] ENABLED_ZNODE_BYTES =
       toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
   public static final byte[] DISABLED_ZNODE_BYTES =
       toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
+      "zookeeper.znode.replication.hfile.refs";
+  public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
 
   public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
       Abortable abortable) {
     this.zookeeper = zookeeper;
     this.conf = conf;
     this.abortable = abortable;
+    this.replicationForBulkLoadEnabled = conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
 
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
     String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
+    String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+      ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
     this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
     this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
+    this.hfileRefsZNode = ZKUtil.joinZNode(replicationZNode, hfileRefsZNodeName);
   }
 
   public List<String> getListOfReplicators() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
index c268268..9e01d09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
 import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -884,7 +885,7 @@ public class ZKUtil {
               JaasConfiguration.SERVER_KEYTAB_KERBEROS_CONFIG_NAME) == null
           && conf.get(HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL) == null
           && conf.get(HConstants.ZK_SERVER_KERBEROS_PRINCIPAL) == null) {
-              
+
         return false;
       }
     } catch(Exception e) {
@@ -1797,6 +1798,27 @@ public class ZKUtil {
       } else if (child.equals(zkw.getConfiguration().
           get("zookeeper.znode.replication.rs", "rs"))) {
         appendRSZnodes(zkw, znode, sb);
+      } else if (child.equals(zkw.getConfiguration().get(
+        ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
+        ReplicationStateZKBase.ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT))) {
+        appendHFileRefsZnodes(zkw, znode, sb);
+      }
+    }
+  }
+
+  private static void appendHFileRefsZnodes(ZooKeeperWatcher zkw, String hfileRefsZnode,
+      StringBuilder sb) throws KeeperException {
+    sb.append("\n").append(hfileRefsZnode).append(": ");
+    for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) {
+      String znodeToProcess = ZKUtil.joinZNode(hfileRefsZnode, peerIdZnode);
+      sb.append("\n").append(znodeToProcess).append(": ");
+      List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess);
+      int size = peerHFileRefsZnodes.size();
+      for (int i = 0; i < size; i++) {
+        sb.append(peerHFileRefsZnodes.get(i));
+        if (i != size - 1) {
+          sb.append(", ");
+        }
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index ac57514..6fafad3 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -850,6 +850,18 @@ public final class HConstants {
       REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
   public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
     "org.apache.hadoop.hbase.replication.regionserver.Replication";
+  public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
+  public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
+  /** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
+  public static final String REPLICATION_CLUSTER_ID = "hbase.replication.cluster.id";
+  /**
+   * Directory where the source cluster file system client configuration are placed which is used by
+   * sink cluster to copy HFiles from source cluster file system
+   */
+  public static final String REPLICATION_CONF_DIR = "hbase.replication.conf.dir";
+
+  /** Maximum time to retry for a failed bulk load request */
+  public static final String BULKLOAD_MAX_RETRIES_NUMBER = "hbase.bulkload.retries.number";
 
   /** HBCK special code name used as server name when manipulating ZK nodes */
   public static final String HBCK_CODE_NAME = "HBCKServerName";
@@ -1241,7 +1253,7 @@ public final class HConstants {
 
   public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
       "hbase.canary.write.table.check.period";
-  
+
   /**
    * Configuration keys for programmatic JAAS configuration for secured ZK interaction
    */
@@ -1250,7 +1262,7 @@ public final class HConstants {
       "hbase.zookeeper.client.kerberos.principal";
   public static final String ZK_SERVER_KEYTAB_FILE = "hbase.zookeeper.server.keytab.file";
   public static final String ZK_SERVER_KERBEROS_PRINCIPAL =
-      "hbase.zookeeper.server.kerberos.principal";  
+      "hbase.zookeeper.server.kerberos.principal";
 
   private HConstants() {
     // Can't be instantiated with this ctor.

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 698a59a..9fb8415 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -22,9 +22,11 @@ public interface MetricsReplicationSinkSource {
   public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
   public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
   public static final String SINK_APPLIED_OPS = "sink.appliedOps";
+  public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
 
   void setLastAppliedOpAge(long age);
   void incrAppliedBatches(long batches);
   void incrAppliedOps(long batchsize);
   long getLastAppliedOpAge();
+  void incrAppliedHFiles(long hfileSize);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index fecf191..188c3a3 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -32,6 +32,9 @@ public interface MetricsReplicationSourceSource {
 
   public static final String SOURCE_LOG_EDITS_FILTERED = "source.logEditsFiltered";
 
+  public static final String SOURCE_SHIPPED_HFILES = "source.shippedHFiles";
+  public static final String SOURCE_SIZE_OF_HFILE_REFS_QUEUE = "source.sizeOfHFileRefsQueue";
+
   void setLastShippedAge(long age);
   void setSizeOfLogQueue(int size);
   void incrSizeOfLogQueue(int size);
@@ -44,4 +47,7 @@ public interface MetricsReplicationSourceSource {
   void incrLogReadInEdits(long size);
   void clear();
   long getLastShippedAge();
+  void incrHFilesShipped(long hfiles);
+  void incrSizeOfHFileRefsQueue(long size);
+  void decrSizeOfHFileRefsQueue(long size);
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
index 6dace10..392cd39 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSource.java
@@ -32,6 +32,8 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   private final MutableCounterLong shippedOpsCounter;
   private final MutableCounterLong shippedKBsCounter;
   private final MutableCounterLong logReadInBytesCounter;
+  private final MutableCounterLong shippedHFilesCounter;
+  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
 
   public MetricsReplicationGlobalSourceSource(MetricsReplicationSourceImpl rms) {
     this.rms = rms;
@@ -51,6 +53,11 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
     logReadInEditsCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_READ_IN_EDITS, 0L);
 
     logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_LOG_EDITS_FILTERED, 0L);
+
+    shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(SOURCE_SHIPPED_HFILES, 0L);
+
+    sizeOfHFileRefsQueueGauge =
+        rms.getMetricsRegistry().getLongGauge(SOURCE_SIZE_OF_HFILE_REFS_QUEUE, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -100,4 +107,18 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
   public long getLastShippedAge() {
     return ageOfLastShippedOpGauge.value();
   }
+
+  @Override public void incrHFilesShipped(long hfiles) {
+    shippedHFilesCounter.incr(hfiles);
+  }
+
+  @Override
+  public void incrSizeOfHFileRefsQueue(long size) {
+    sizeOfHFileRefsQueueGauge.incr(size);
+  }
+
+  @Override
+  public void decrSizeOfHFileRefsQueue(long size) {
+    sizeOfHFileRefsQueueGauge.decr(size);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index 14212ba..8f4a337 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -26,11 +26,13 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
   private final MutableGaugeLong ageGauge;
   private final MutableCounterLong batchesCounter;
   private final MutableCounterLong opsCounter;
+  private final MutableCounterLong hfilesCounter;
 
   public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
     ageGauge = rms.getMetricsRegistry().getLongGauge(SINK_AGE_OF_LAST_APPLIED_OP, 0L);
     batchesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_BATCHES, 0L);
     opsCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_OPS, 0L);
+    hfilesCounter = rms.getMetricsRegistry().getLongCounter(SINK_APPLIED_HFILES, 0L);
   }
 
   @Override public void setLastAppliedOpAge(long age) {
@@ -49,4 +51,9 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
   public long getLastAppliedOpAge() {
     return ageGauge.value();
   }
+
+  @Override
+  public void incrAppliedHFiles(long hfiles) {
+    hfilesCounter.incr(hfiles);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
----------------------------------------------------------------------
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 1422e7e..217cc3e 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -32,6 +32,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String shippedOpsKey;
   private final String shippedKBsKey;
   private final String logReadInBytesKey;
+  private final String shippedHFilesKey;
+  private final String sizeOfHFileRefsQueueKey;
 
   private final MutableGaugeLong ageOfLastShippedOpGauge;
   private final MutableGaugeLong sizeOfLogQueueGauge;
@@ -41,6 +43,8 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final MutableCounterLong shippedOpsCounter;
   private final MutableCounterLong shippedKBsCounter;
   private final MutableCounterLong logReadInBytesCounter;
+  private final MutableCounterLong shippedHFilesCounter;
+  private final MutableGaugeLong sizeOfHFileRefsQueueGauge;
 
   public MetricsReplicationSourceSourceImpl(MetricsReplicationSourceImpl rms, String id) {
     this.rms = rms;
@@ -69,6 +73,12 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
 
     logEditsFilteredKey = "source." + id + ".logEditsFiltered";
     logEditsFilteredCounter = rms.getMetricsRegistry().getLongCounter(logEditsFilteredKey, 0L);
+
+    shippedHFilesKey = "source." + this.id + ".shippedHFiles";
+    shippedHFilesCounter = rms.getMetricsRegistry().getLongCounter(shippedHFilesKey, 0L);
+
+    sizeOfHFileRefsQueueKey = "source." + id + ".sizeOfHFileRefsQueue";
+    sizeOfHFileRefsQueueGauge = rms.getMetricsRegistry().getLongGauge(sizeOfHFileRefsQueueKey, 0L);
   }
 
   @Override public void setLastShippedAge(long age) {
@@ -124,10 +134,28 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     rms.removeMetric(logReadInEditsKey);
 
     rms.removeMetric(logEditsFilteredKey);
+
+    rms.removeMetric(shippedHFilesKey);
+    rms.removeMetric(sizeOfHFileRefsQueueKey);
   }
 
   @Override
   public long getLastShippedAge() {
     return ageOfLastShippedOpGauge.value();
   }
+
+  @Override
+  public void incrHFilesShipped(long hfiles) {
+    shippedHFilesCounter.incr(hfiles);
+  }
+
+  @Override
+  public void incrSizeOfHFileRefsQueue(long size) {
+    sizeOfHFileRefsQueueGauge.incr(size);
+  }
+
+  @Override
+  public void decrSizeOfHFileRefsQueue(long size) {
+    sizeOfHFileRefsQueueGauge.decr(size);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
index b4c378b..1c59ea6 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
@@ -16896,6 +16896,51 @@ public final class AdminProtos {
      */
     org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntryOrBuilder getEntryOrBuilder(
         int index);
+
+    // optional string replicationClusterId = 2;
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    boolean hasReplicationClusterId();
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    java.lang.String getReplicationClusterId();
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    com.google.protobuf.ByteString
+        getReplicationClusterIdBytes();
+
+    // optional string sourceBaseNamespaceDirPath = 3;
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    boolean hasSourceBaseNamespaceDirPath();
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    java.lang.String getSourceBaseNamespaceDirPath();
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    com.google.protobuf.ByteString
+        getSourceBaseNamespaceDirPathBytes();
+
+    // optional string sourceHFileArchiveDirPath = 4;
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    boolean hasSourceHFileArchiveDirPath();
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    java.lang.String getSourceHFileArchiveDirPath();
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    com.google.protobuf.ByteString
+        getSourceHFileArchiveDirPathBytes();
   }
   /**
    * Protobuf type {@code hbase.pb.ReplicateWALEntryRequest}
@@ -16963,6 +17008,21 @@ public final class AdminProtos {
               entry_.add(input.readMessage(org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.PARSER, extensionRegistry));
               break;
             }
+            case 18: {
+              bitField0_ |= 0x00000001;
+              replicationClusterId_ = input.readBytes();
+              break;
+            }
+            case 26: {
+              bitField0_ |= 0x00000002;
+              sourceBaseNamespaceDirPath_ = input.readBytes();
+              break;
+            }
+            case 34: {
+              bitField0_ |= 0x00000004;
+              sourceHFileArchiveDirPath_ = input.readBytes();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -17005,6 +17065,7 @@ public final class AdminProtos {
       return PARSER;
     }
 
+    private int bitField0_;
     // repeated .hbase.pb.WALEntry entry = 1;
     public static final int ENTRY_FIELD_NUMBER = 1;
     private java.util.List<org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry> entry_;
@@ -17041,8 +17102,140 @@ public final class AdminProtos {
       return entry_.get(index);
     }
 
+    // optional string replicationClusterId = 2;
+    public static final int REPLICATIONCLUSTERID_FIELD_NUMBER = 2;
+    private java.lang.Object replicationClusterId_;
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    public boolean hasReplicationClusterId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    public java.lang.String getReplicationClusterId() {
+      java.lang.Object ref = replicationClusterId_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          replicationClusterId_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string replicationClusterId = 2;</code>
+     */
+    public com.google.protobuf.ByteString
+        getReplicationClusterIdBytes() {
+      java.lang.Object ref = replicationClusterId_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        replicationClusterId_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional string sourceBaseNamespaceDirPath = 3;
+    public static final int SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER = 3;
+    private java.lang.Object sourceBaseNamespaceDirPath_;
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    public boolean hasSourceBaseNamespaceDirPath() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    public java.lang.String getSourceBaseNamespaceDirPath() {
+      java.lang.Object ref = sourceBaseNamespaceDirPath_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          sourceBaseNamespaceDirPath_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+     */
+    public com.google.protobuf.ByteString
+        getSourceBaseNamespaceDirPathBytes() {
+      java.lang.Object ref = sourceBaseNamespaceDirPath_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        sourceBaseNamespaceDirPath_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
+    // optional string sourceHFileArchiveDirPath = 4;
+    public static final int SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER = 4;
+    private java.lang.Object sourceHFileArchiveDirPath_;
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    public boolean hasSourceHFileArchiveDirPath() {
+      return ((bitField0_ & 0x00000004) == 0x00000004);
+    }
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    public java.lang.String getSourceHFileArchiveDirPath() {
+      java.lang.Object ref = sourceHFileArchiveDirPath_;
+      if (ref instanceof java.lang.String) {
+        return (java.lang.String) ref;
+      } else {
+        com.google.protobuf.ByteString bs = 
+            (com.google.protobuf.ByteString) ref;
+        java.lang.String s = bs.toStringUtf8();
+        if (bs.isValidUtf8()) {
+          sourceHFileArchiveDirPath_ = s;
+        }
+        return s;
+      }
+    }
+    /**
+     * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+     */
+    public com.google.protobuf.ByteString
+        getSourceHFileArchiveDirPathBytes() {
+      java.lang.Object ref = sourceHFileArchiveDirPath_;
+      if (ref instanceof java.lang.String) {
+        com.google.protobuf.ByteString b = 
+            com.google.protobuf.ByteString.copyFromUtf8(
+                (java.lang.String) ref);
+        sourceHFileArchiveDirPath_ = b;
+        return b;
+      } else {
+        return (com.google.protobuf.ByteString) ref;
+      }
+    }
+
     private void initFields() {
       entry_ = java.util.Collections.emptyList();
+      replicationClusterId_ = "";
+      sourceBaseNamespaceDirPath_ = "";
+      sourceHFileArchiveDirPath_ = "";
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -17065,6 +17258,15 @@ public final class AdminProtos {
       for (int i = 0; i < entry_.size(); i++) {
         output.writeMessage(1, entry_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(2, getReplicationClusterIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeBytes(3, getSourceBaseNamespaceDirPathBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        output.writeBytes(4, getSourceHFileArchiveDirPathBytes());
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -17078,6 +17280,18 @@ public final class AdminProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(1, entry_.get(i));
       }
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, getReplicationClusterIdBytes());
+      }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(3, getSourceBaseNamespaceDirPathBytes());
+      }
+      if (((bitField0_ & 0x00000004) == 0x00000004)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(4, getSourceHFileArchiveDirPathBytes());
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -17103,6 +17317,21 @@ public final class AdminProtos {
       boolean result = true;
       result = result && getEntryList()
           .equals(other.getEntryList());
+      result = result && (hasReplicationClusterId() == other.hasReplicationClusterId());
+      if (hasReplicationClusterId()) {
+        result = result && getReplicationClusterId()
+            .equals(other.getReplicationClusterId());
+      }
+      result = result && (hasSourceBaseNamespaceDirPath() == other.hasSourceBaseNamespaceDirPath());
+      if (hasSourceBaseNamespaceDirPath()) {
+        result = result && getSourceBaseNamespaceDirPath()
+            .equals(other.getSourceBaseNamespaceDirPath());
+      }
+      result = result && (hasSourceHFileArchiveDirPath() == other.hasSourceHFileArchiveDirPath());
+      if (hasSourceHFileArchiveDirPath()) {
+        result = result && getSourceHFileArchiveDirPath()
+            .equals(other.getSourceHFileArchiveDirPath());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -17120,6 +17349,18 @@ public final class AdminProtos {
         hash = (37 * hash) + ENTRY_FIELD_NUMBER;
         hash = (53 * hash) + getEntryList().hashCode();
       }
+      if (hasReplicationClusterId()) {
+        hash = (37 * hash) + REPLICATIONCLUSTERID_FIELD_NUMBER;
+        hash = (53 * hash) + getReplicationClusterId().hashCode();
+      }
+      if (hasSourceBaseNamespaceDirPath()) {
+        hash = (37 * hash) + SOURCEBASENAMESPACEDIRPATH_FIELD_NUMBER;
+        hash = (53 * hash) + getSourceBaseNamespaceDirPath().hashCode();
+      }
+      if (hasSourceHFileArchiveDirPath()) {
+        hash = (37 * hash) + SOURCEHFILEARCHIVEDIRPATH_FIELD_NUMBER;
+        hash = (53 * hash) + getSourceHFileArchiveDirPath().hashCode();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -17243,6 +17484,12 @@ public final class AdminProtos {
         } else {
           entryBuilder_.clear();
         }
+        replicationClusterId_ = "";
+        bitField0_ = (bitField0_ & ~0x00000002);
+        sourceBaseNamespaceDirPath_ = "";
+        bitField0_ = (bitField0_ & ~0x00000004);
+        sourceHFileArchiveDirPath_ = "";
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
 
@@ -17270,6 +17517,7 @@ public final class AdminProtos {
       public org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest buildPartial() {
         org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest result = new org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryRequest(this);
         int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
         if (entryBuilder_ == null) {
           if (((bitField0_ & 0x00000001) == 0x00000001)) {
             entry_ = java.util.Collections.unmodifiableList(entry_);
@@ -17279,6 +17527,19 @@ public final class AdminProtos {
         } else {
           result.entry_ = entryBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.replicationClusterId_ = replicationClusterId_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.sourceBaseNamespaceDirPath_ = sourceBaseNamespaceDirPath_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000004;
+        }
+        result.sourceHFileArchiveDirPath_ = sourceHFileArchiveDirPath_;
+        result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
       }
@@ -17320,6 +17581,21 @@ public final class AdminProtos {
             }
           }
         }
+        if (other.hasReplicationClusterId()) {
+          bitField0_ |= 0x00000002;
+          replicationClusterId_ = other.replicationClusterId_;
+          onChanged();
+        }
+        if (other.hasSourceBaseNamespaceDirPath()) {
+          bitField0_ |= 0x00000004;
+          sourceBaseNamespaceDirPath_ = other.sourceBaseNamespaceDirPath_;
+          onChanged();
+        }
+        if (other.hasSourceHFileArchiveDirPath()) {
+          bitField0_ |= 0x00000008;
+          sourceHFileArchiveDirPath_ = other.sourceHFileArchiveDirPath_;
+          onChanged();
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -17593,6 +17869,228 @@ public final class AdminProtos {
         return entryBuilder_;
       }
 
+      // optional string replicationClusterId = 2;
+      private java.lang.Object replicationClusterId_ = "";
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public boolean hasReplicationClusterId() {
+        return ((bitField0_ & 0x00000002) == 0x00000002);
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public java.lang.String getReplicationClusterId() {
+        java.lang.Object ref = replicationClusterId_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          replicationClusterId_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public com.google.protobuf.ByteString
+          getReplicationClusterIdBytes() {
+        java.lang.Object ref = replicationClusterId_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          replicationClusterId_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public Builder setReplicationClusterId(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        replicationClusterId_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public Builder clearReplicationClusterId() {
+        bitField0_ = (bitField0_ & ~0x00000002);
+        replicationClusterId_ = getDefaultInstance().getReplicationClusterId();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string replicationClusterId = 2;</code>
+       */
+      public Builder setReplicationClusterIdBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000002;
+        replicationClusterId_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional string sourceBaseNamespaceDirPath = 3;
+      private java.lang.Object sourceBaseNamespaceDirPath_ = "";
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public boolean hasSourceBaseNamespaceDirPath() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public java.lang.String getSourceBaseNamespaceDirPath() {
+        java.lang.Object ref = sourceBaseNamespaceDirPath_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          sourceBaseNamespaceDirPath_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public com.google.protobuf.ByteString
+          getSourceBaseNamespaceDirPathBytes() {
+        java.lang.Object ref = sourceBaseNamespaceDirPath_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          sourceBaseNamespaceDirPath_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public Builder setSourceBaseNamespaceDirPath(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        sourceBaseNamespaceDirPath_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public Builder clearSourceBaseNamespaceDirPath() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        sourceBaseNamespaceDirPath_ = getDefaultInstance().getSourceBaseNamespaceDirPath();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string sourceBaseNamespaceDirPath = 3;</code>
+       */
+      public Builder setSourceBaseNamespaceDirPathBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000004;
+        sourceBaseNamespaceDirPath_ = value;
+        onChanged();
+        return this;
+      }
+
+      // optional string sourceHFileArchiveDirPath = 4;
+      private java.lang.Object sourceHFileArchiveDirPath_ = "";
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public boolean hasSourceHFileArchiveDirPath() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public java.lang.String getSourceHFileArchiveDirPath() {
+        java.lang.Object ref = sourceHFileArchiveDirPath_;
+        if (!(ref instanceof java.lang.String)) {
+          java.lang.String s = ((com.google.protobuf.ByteString) ref)
+              .toStringUtf8();
+          sourceHFileArchiveDirPath_ = s;
+          return s;
+        } else {
+          return (java.lang.String) ref;
+        }
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public com.google.protobuf.ByteString
+          getSourceHFileArchiveDirPathBytes() {
+        java.lang.Object ref = sourceHFileArchiveDirPath_;
+        if (ref instanceof String) {
+          com.google.protobuf.ByteString b = 
+              com.google.protobuf.ByteString.copyFromUtf8(
+                  (java.lang.String) ref);
+          sourceHFileArchiveDirPath_ = b;
+          return b;
+        } else {
+          return (com.google.protobuf.ByteString) ref;
+        }
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public Builder setSourceHFileArchiveDirPath(
+          java.lang.String value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        sourceHFileArchiveDirPath_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public Builder clearSourceHFileArchiveDirPath() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        sourceHFileArchiveDirPath_ = getDefaultInstance().getSourceHFileArchiveDirPath();
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional string sourceHFileArchiveDirPath = 4;</code>
+       */
+      public Builder setSourceHFileArchiveDirPathBytes(
+          com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000008;
+        sourceHFileArchiveDirPath_ = value;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.ReplicateWALEntryRequest)
     }
 
@@ -23539,56 +24037,58 @@ public final class AdminProtos {
       "ster_system_time\030\004 \001(\004\"\026\n\024MergeRegionsRe" +
       "sponse\"a\n\010WALEntry\022\035\n\003key\030\001 \002(\0132\020.hbase." +
       "pb.WALKey\022\027\n\017key_value_bytes\030\002 \003(\014\022\035\n\025as",
-      "sociated_cell_count\030\003 \001(\005\"=\n\030ReplicateWA" +
-      "LEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb." +
-      "WALEntry\"\033\n\031ReplicateWALEntryResponse\"\026\n" +
-      "\024RollWALWriterRequest\"0\n\025RollWALWriterRe" +
-      "sponse\022\027\n\017region_to_flush\030\001 \003(\014\"#\n\021StopS" +
-      "erverRequest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServ" +
-      "erResponse\"\026\n\024GetServerInfoRequest\"K\n\nSe" +
-      "rverInfo\022)\n\013server_name\030\001 \002(\0132\024.hbase.pb" +
-      ".ServerName\022\022\n\nwebui_port\030\002 \001(\r\"B\n\025GetSe" +
-      "rverInfoResponse\022)\n\013server_info\030\001 \002(\0132\024.",
-      "hbase.pb.ServerInfo\"\034\n\032UpdateConfigurati" +
-      "onRequest\"\035\n\033UpdateConfigurationResponse" +
-      "2\207\013\n\014AdminService\022P\n\rGetRegionInfo\022\036.hba" +
-      "se.pb.GetRegionInfoRequest\032\037.hbase.pb.Ge" +
-      "tRegionInfoResponse\022M\n\014GetStoreFile\022\035.hb" +
-      "ase.pb.GetStoreFileRequest\032\036.hbase.pb.Ge" +
-      "tStoreFileResponse\022V\n\017GetOnlineRegion\022 ." +
-      "hbase.pb.GetOnlineRegionRequest\032!.hbase." +
-      "pb.GetOnlineRegionResponse\022G\n\nOpenRegion" +
-      "\022\033.hbase.pb.OpenRegionRequest\032\034.hbase.pb",
-      ".OpenRegionResponse\022M\n\014WarmupRegion\022\035.hb" +
-      "ase.pb.WarmupRegionRequest\032\036.hbase.pb.Wa" +
-      "rmupRegionResponse\022J\n\013CloseRegion\022\034.hbas" +
-      "e.pb.CloseRegionRequest\032\035.hbase.pb.Close" +
-      "RegionResponse\022J\n\013FlushRegion\022\034.hbase.pb" +
-      ".FlushRegionRequest\032\035.hbase.pb.FlushRegi" +
-      "onResponse\022J\n\013SplitRegion\022\034.hbase.pb.Spl" +
-      "itRegionRequest\032\035.hbase.pb.SplitRegionRe" +
-      "sponse\022P\n\rCompactRegion\022\036.hbase.pb.Compa" +
-      "ctRegionRequest\032\037.hbase.pb.CompactRegion",
-      "Response\022M\n\014MergeRegions\022\035.hbase.pb.Merg" +
-      "eRegionsRequest\032\036.hbase.pb.MergeRegionsR" +
-      "esponse\022\\\n\021ReplicateWALEntry\022\".hbase.pb." +
-      "ReplicateWALEntryRequest\032#.hbase.pb.Repl" +
-      "icateWALEntryResponse\022Q\n\006Replay\022\".hbase." +
-      "pb.ReplicateWALEntryRequest\032#.hbase.pb.R" +
-      "eplicateWALEntryResponse\022P\n\rRollWALWrite" +
-      "r\022\036.hbase.pb.RollWALWriterRequest\032\037.hbas" +
-      "e.pb.RollWALWriterResponse\022P\n\rGetServerI" +
-      "nfo\022\036.hbase.pb.GetServerInfoRequest\032\037.hb",
-      "ase.pb.GetServerInfoResponse\022G\n\nStopServ" +
-      "er\022\033.hbase.pb.StopServerRequest\032\034.hbase." +
-      "pb.StopServerResponse\022_\n\022UpdateFavoredNo" +
-      "des\022#.hbase.pb.UpdateFavoredNodesRequest" +
-      "\032$.hbase.pb.UpdateFavoredNodesResponse\022b" +
-      "\n\023UpdateConfiguration\022$.hbase.pb.UpdateC" +
-      "onfigurationRequest\032%.hbase.pb.UpdateCon" +
-      "figurationResponseBA\n*org.apache.hadoop." +
-      "hbase.protobuf.generatedB\013AdminProtosH\001\210" +
-      "\001\001\240\001\001"
+      "sociated_cell_count\030\003 \001(\005\"\242\001\n\030ReplicateW" +
+      "ALEntryRequest\022!\n\005entry\030\001 \003(\0132\022.hbase.pb" +
+      ".WALEntry\022\034\n\024replicationClusterId\030\002 \001(\t\022" +
+      "\"\n\032sourceBaseNamespaceDirPath\030\003 \001(\t\022!\n\031s" +
+      "ourceHFileArchiveDirPath\030\004 \001(\t\"\033\n\031Replic" +
+      "ateWALEntryResponse\"\026\n\024RollWALWriterRequ" +
+      "est\"0\n\025RollWALWriterResponse\022\027\n\017region_t" +
+      "o_flush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006re" +
+      "ason\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetS" +
+      "erverInfoRequest\"K\n\nServerInfo\022)\n\013server",
+      "_name\030\001 \002(\0132\024.hbase.pb.ServerName\022\022\n\nweb" +
+      "ui_port\030\002 \001(\r\"B\n\025GetServerInfoResponse\022)" +
+      "\n\013server_info\030\001 \002(\0132\024.hbase.pb.ServerInf" +
+      "o\"\034\n\032UpdateConfigurationRequest\"\035\n\033Updat" +
+      "eConfigurationResponse2\207\013\n\014AdminService\022" +
+      "P\n\rGetRegionInfo\022\036.hbase.pb.GetRegionInf" +
+      "oRequest\032\037.hbase.pb.GetRegionInfoRespons" +
+      "e\022M\n\014GetStoreFile\022\035.hbase.pb.GetStoreFil" +
+      "eRequest\032\036.hbase.pb.GetStoreFileResponse" +
+      "\022V\n\017GetOnlineRegion\022 .hbase.pb.GetOnline",
+      "RegionRequest\032!.hbase.pb.GetOnlineRegion" +
+      "Response\022G\n\nOpenRegion\022\033.hbase.pb.OpenRe" +
+      "gionRequest\032\034.hbase.pb.OpenRegionRespons" +
+      "e\022M\n\014WarmupRegion\022\035.hbase.pb.WarmupRegio" +
+      "nRequest\032\036.hbase.pb.WarmupRegionResponse" +
+      "\022J\n\013CloseRegion\022\034.hbase.pb.CloseRegionRe" +
+      "quest\032\035.hbase.pb.CloseRegionResponse\022J\n\013" +
+      "FlushRegion\022\034.hbase.pb.FlushRegionReques" +
+      "t\032\035.hbase.pb.FlushRegionResponse\022J\n\013Spli" +
+      "tRegion\022\034.hbase.pb.SplitRegionRequest\032\035.",
+      "hbase.pb.SplitRegionResponse\022P\n\rCompactR" +
+      "egion\022\036.hbase.pb.CompactRegionRequest\032\037." +
+      "hbase.pb.CompactRegionResponse\022M\n\014MergeR" +
+      "egions\022\035.hbase.pb.MergeRegionsRequest\032\036." +
+      "hbase.pb.MergeRegionsResponse\022\\\n\021Replica" +
+      "teWALEntry\022\".hbase.pb.ReplicateWALEntryR" +
+      "equest\032#.hbase.pb.ReplicateWALEntryRespo" +
+      "nse\022Q\n\006Replay\022\".hbase.pb.ReplicateWALEnt" +
+      "ryRequest\032#.hbase.pb.ReplicateWALEntryRe" +
+      "sponse\022P\n\rRollWALWriter\022\036.hbase.pb.RollW",
+      "ALWriterRequest\032\037.hbase.pb.RollWALWriter" +
+      "Response\022P\n\rGetServerInfo\022\036.hbase.pb.Get" +
+      "ServerInfoRequest\032\037.hbase.pb.GetServerIn" +
+      "foResponse\022G\n\nStopServer\022\033.hbase.pb.Stop" +
+      "ServerRequest\032\034.hbase.pb.StopServerRespo" +
+      "nse\022_\n\022UpdateFavoredNodes\022#.hbase.pb.Upd" +
+      "ateFavoredNodesRequest\032$.hbase.pb.Update" +
+      "FavoredNodesResponse\022b\n\023UpdateConfigurat" +
+      "ion\022$.hbase.pb.UpdateConfigurationReques" +
+      "t\032%.hbase.pb.UpdateConfigurationResponse",
+      "BA\n*org.apache.hadoop.hbase.protobuf.gen" +
+      "eratedB\013AdminProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -23750,7 +24250,7 @@ public final class AdminProtos {
           internal_static_hbase_pb_ReplicateWALEntryRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_ReplicateWALEntryRequest_descriptor,
-              new java.lang.String[] { "Entry", });
+              new java.lang.String[] { "Entry", "ReplicationClusterId", "SourceBaseNamespaceDirPath", "SourceHFileArchiveDirPath", });
           internal_static_hbase_pb_ReplicateWALEntryResponse_descriptor =
             getDescriptor().getMessageTypes().get(24);
           internal_static_hbase_pb_ReplicateWALEntryResponse_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-protocol/src/main/protobuf/Admin.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Admin.proto b/hbase-protocol/src/main/protobuf/Admin.proto
index f7787f5..a1905a4 100644
--- a/hbase-protocol/src/main/protobuf/Admin.proto
+++ b/hbase-protocol/src/main/protobuf/Admin.proto
@@ -211,6 +211,9 @@ message WALEntry {
  */
 message ReplicateWALEntryRequest {
   repeated WALEntry entry = 1;
+  optional string replicationClusterId = 2;
+  optional string sourceBaseNamespaceDirPath = 3;
+  optional string sourceHFileArchiveDirPath = 4;
 }
 
 message ReplicateWALEntryResponse {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 44be2d3..369ae90 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSHDFSUtils;
@@ -125,6 +126,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private FsDelegationToken fsDelegationToken;
   private String bulkToken;
   private UserProvider userProvider;
+  private int nrThreads;
 
   private LoadIncrementalHFiles() {}
 
@@ -146,6 +148,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
     maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
+    nrThreads = conf.getInt("hbase.loadincremental.threads.max",
+      Runtime.getRuntime().availableProcessors());
     initalized = true;
   }
 
@@ -246,7 +250,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * region boundary, and each part is added back into the queue.
    * The import process finishes when the queue is empty.
    */
-  static class LoadQueueItem {
+  public static class LoadQueueItem {
     final byte[] family;
     final Path hfilePath;
 
@@ -313,7 +317,6 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * @param table the table to load into
    * @throws TableNotFoundException if table does not yet exist
    */
-  @SuppressWarnings("deprecation")
   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
       RegionLocator regionLocator) throws TableNotFoundException, IOException  {
 
@@ -321,16 +324,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       throw new TableNotFoundException("Table " + table.getName() + "is not currently available.");
     }
 
-    // initialize thread pools
-    int nrThreads = getConf().getInt("hbase.loadincremental.threads.max",
-      Runtime.getRuntime().availableProcessors());
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    builder.setNameFormat("LoadIncrementalHFiles-%1$d");
-    ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads,
-        60, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<Runnable>(),
-        builder.build());
-    ((ThreadPoolExecutor)pool).allowCoreThreadTimeOut(true);
+    ExecutorService pool = createExecutorService();
 
     // LQI queue does not need to be threadsafe -- all operations on this queue
     // happen in this thread
@@ -347,30 +341,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 	    "option, consider removing the files and bulkload again without this option. " +
 	    "See HBASE-13985");
       }
-      discoverLoadQueue(queue, hfofDir, validateHFile);
-      // check whether there is invalid family name in HFiles to be bulkloaded
-      Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
-      ArrayList<String> familyNames = new ArrayList<String>(families.size());
-      for (HColumnDescriptor family : families) {
-        familyNames.add(family.getNameAsString());
-      }
-      ArrayList<String> unmatchedFamilies = new ArrayList<String>();
-      Iterator<LoadQueueItem> queueIter = queue.iterator();
-      while (queueIter.hasNext()) {
-        LoadQueueItem lqi = queueIter.next();
-        String familyNameInHFile = Bytes.toString(lqi.family);
-        if (!familyNames.contains(familyNameInHFile)) {
-          unmatchedFamilies.add(familyNameInHFile);
-        }
-      }
-      if (unmatchedFamilies.size() > 0) {
-        String msg =
-            "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
-                + unmatchedFamilies + "; valid family names of table "
-                + table.getName() + " are: " + familyNames;
-        LOG.error(msg);
-        throw new IOException(msg);
-      }
+      prepareHFileQueue(hfofDir, table, queue, validateHFile);
+
       int count = 0;
 
       if (queue.isEmpty()) {
@@ -397,7 +369,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
               + count + " with " + queue.size() + " files remaining to group or split");
         }
 
-        int maxRetries = getConf().getInt("hbase.bulkload.retries.number", 10);
+        int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
         maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
         if (maxRetries != 0 && count >= maxRetries) {
           throw new IOException("Retry attempted " + count +
@@ -447,6 +419,85 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   }
 
   /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+   * passed directory and validates whether the prepared queue has all the valid table column
+   * families in it.
+   * @param hfilesDir directory containing list of hfiles to be loaded into the table
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Path hfofDir, Table table, Deque<LoadQueueItem> queue,
+      boolean validateHFile) throws IOException {
+    discoverLoadQueue(queue, hfofDir, validateHFile);
+    validateFamiliesInHFiles(table, queue);
+  }
+
+  // Initialize a thread pool
+  private ExecutorService createExecutorService() {
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("LoadIncrementalHFiles-%1$d");
+    ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(), builder.build());
+    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
+  /**
+   * Checks whether there is any invalid family name in HFiles to be bulk loaded.
+   */
+  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
+      throws IOException {
+    Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
+    List<String> familyNames = new ArrayList<String>(families.size());
+    for (HColumnDescriptor family : families) {
+      familyNames.add(family.getNameAsString());
+    }
+    List<String> unmatchedFamilies = new ArrayList<String>();
+    Iterator<LoadQueueItem> queueIter = queue.iterator();
+    while (queueIter.hasNext()) {
+      LoadQueueItem lqi = queueIter.next();
+      String familyNameInHFile = Bytes.toString(lqi.family);
+      if (!familyNames.contains(familyNameInHFile)) {
+        unmatchedFamilies.add(familyNameInHFile);
+      }
+    }
+    if (unmatchedFamilies.size() > 0) {
+      String msg =
+          "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
+              + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
+              + familyNames;
+      LOG.error(msg);
+      throw new IOException(msg);
+    }
+  }
+
+  /**
+   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+   * 1. {@link LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)} 2.
+   * {@link
+   * LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)}
+   * @param table Table to which these hfiles should be loaded to
+   * @param conn Connection to use
+   * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+   * @param startEndKeys starting and ending row keys of the region
+   */
+  public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
+      Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+    ExecutorService pool = null;
+    try {
+      pool = createExecutorService();
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups =
+          groupOrSplitPhase(table, pool, queue, startEndKeys);
+      bulkLoadPhase(table, conn, pool, queue, regionGroups);
+    } finally {
+      if (pool != null) {
+        pool.shutdown();
+      }
+    }
+  }
+
+  /**
    * This takes the LQI's grouped by likely regions and attempts to bulk load
    * them.  Any failures are re-queued for another pass with the
    * groupOrSplitPhase.
@@ -592,10 +643,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
     String uniqueName = getUniqueName();
     HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
+
     Path botOut = new Path(tmpDir, uniqueName + ".bottom");
     Path topOut = new Path(tmpDir, uniqueName + ".top");
-    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey,
-        botOut, topOut);
+    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
 
     FileSystem fs = tmpDir.getFileSystem(getConf());
     fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
@@ -626,6 +677,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       final Pair<byte[][], byte[][]> startEndKeys)
       throws IOException {
     final Path hfilePath = item.hfilePath;
+    // fs is the source filesystem
+    if (fs == null) {
+      fs = hfilePath.getFileSystem(getConf());
+    }
     HFile.Reader hfr = HFile.createReader(fs, hfilePath,
         new CacheConfig(getConf()), getConf());
     final byte[] first, last;
@@ -712,7 +767,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * failure
    */
   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
-      final TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
+      final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
   throws IOException {
     final List<Pair<byte[], String>> famPaths =
       new ArrayList<Pair<byte[], String>>(lqis.size());
@@ -747,6 +802,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           //in user directory
           if(secureClient != null && !success) {
             FileSystem targetFs = FileSystem.get(getConf());
+         // fs is the source filesystem
+            if(fs == null) {
+              fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
+            }
             // Check to see if the source and target filesystems are the same
             // If they are the same filesystem, we will try move the files back
             // because previously we moved them to the staging directory.
@@ -1000,4 +1059,17 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     System.exit(ret);
   }
 
+  /**
+   * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
+   * used only when {@link SecureBulkLoadEndpoint} is configured in hbase.coprocessor.region.classes
+   * property. This directory is used as a temporary directory where all files are initially
+   * copied/moved from user given directory, set all the required file permissions and then from
+   * their it is finally loaded into a table. This should be set only when, one would like to manage
+   * the staging directory by itself. Otherwise this tool will handle this by itself.
+   * @param stagingDir staging directory path
+   */
+  public void setBulkToken(String stagingDir) {
+    this.bulkToken = stagingDir;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index d6a120b..91185af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -28,22 +28,23 @@ import java.util.Map;
 import java.util.NavigableMap;
 import java.util.UUID;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.ByteStringer;
 import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.hbase.wal.WALKey;
 
 import com.google.protobuf.ServiceException;
 
@@ -51,15 +52,20 @@ import com.google.protobuf.ServiceException;
 public class ReplicationProtbufUtil {
   /**
    * A helper to replicate a list of WAL entries using admin protocol.
-   *
-   * @param admin
-   * @param entries
+   * @param admin Admin service
+   * @param entries Array of WAL entries to be replicated
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+   * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
    * @throws java.io.IOException
    */
   public static void replicateWALEntry(final AdminService.BlockingInterface admin,
-      final Entry[] entries) throws IOException {
+      final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
+      Path sourceHFileArchiveDir) throws IOException {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-      buildReplicateWALEntryRequest(entries, null);
+        buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
+          sourceHFileArchiveDir);
     PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
     try {
       admin.replicateWALEntry(controller, p.getFirst());
@@ -77,19 +83,22 @@ public class ReplicationProtbufUtil {
    */
   public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
       buildReplicateWALEntryRequest(final Entry[] entries) {
-    return buildReplicateWALEntryRequest(entries, null);
+    return buildReplicateWALEntryRequest(entries, null, null, null, null);
   }
 
   /**
    * Create a new ReplicateWALEntryRequest from a list of WAL entries
-   *
    * @param entries the WAL entries to be replicated
    * @param encodedRegionName alternative region name to use if not null
-   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
-   * found.
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
+   * @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
+   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
    */
   public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
-      buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName) {
+      buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
+          String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
     // Accumulate all the Cells seen in here.
     List<List<? extends Cell>> allCells = new ArrayList<List<? extends Cell>>(entries.length);
     int size = 0;
@@ -146,6 +155,17 @@ public class ReplicationProtbufUtil {
       entryBuilder.setAssociatedCellCount(cells.size());
       builder.addEntry(entryBuilder.build());
     }
+
+    if (replicationClusterId != null) {
+      builder.setReplicationClusterId(replicationClusterId);
+    }
+    if (sourceBaseNamespaceDir != null) {
+      builder.setSourceBaseNamespaceDirPath(sourceBaseNamespaceDir.toString());
+    }
+    if (sourceHFileArchiveDir != null) {
+      builder.setSourceHFileArchiveDirPath(sourceHFileArchiveDir.toString());
+    }
+
     return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
       getCellScanner(allCells, size));
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index d94e11c..0c9b0e6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -1800,7 +1800,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         List<WALEntry> entries = request.getEntryList();
         CellScanner cellScanner = ((PayloadCarryingRpcController)controller).cellScanner();
         regionServer.getRegionServerCoprocessorHost().preReplicateLogEntries(entries, cellScanner);
-        regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner);
+        regionServer.replicationSinkHandler.replicateLogEntries(entries, cellScanner,
+          request.getReplicationClusterId(), request.getSourceBaseNamespaceDirPath(),
+          request.getSourceHFileArchiveDirPath());
         regionServer.getRegionServerCoprocessorHost().postReplicateLogEntries(entries, cellScanner);
         return ReplicateWALEntryResponse.newBuilder().build();
       } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
index 5f96bf7..836d3aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
@@ -36,7 +36,13 @@ public interface ReplicationSinkService extends ReplicationService {
    * Carry on the list of log entries down to the sink
    * @param entries list of WALEntries to replicate
    * @param cells Cells that the WALEntries refer to (if cells is non-null)
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+   *          directory required for replicating hfiles
+   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
    * @throws IOException
    */
-  void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException;
+  void replicateLogEntries(List<WALEntry> entries, CellScanner cells, String replicationClusterId,
+      String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath) throws IOException;
 }


[2/3] hbase git commit: HBASE-13153 Bulk Loaded HFile Replication (Ashish Singhi)

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
index 457d859..db98083 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALActionsListener.java
@@ -85,17 +85,16 @@ public interface WALActionsListener {
   );
 
   /**
-   *
    * @param htd
    * @param logKey
-   * @param logEdit
-   * TODO: Retire this in favor of {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)}
-   * It only exists to get scope when replicating.  Scope should be in the WALKey and not need
-   * us passing in a <code>htd</code>.
+   * @param logEdit TODO: Retire this in favor of
+   *          {@link #visitLogEntryBeforeWrite(HRegionInfo, WALKey, WALEdit)} It only exists to get
+   *          scope when replicating. Scope should be in the WALKey and not need us passing in a
+   *          <code>htd</code>.
+   * @throws IOException If failed to parse the WALEdit
    */
-  void visitLogEntryBeforeWrite(
-    HTableDescriptor htd, WALKey logKey, WALEdit logEdit
-  );
+  void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+      throws IOException;
 
   /**
    * For notification post append to the writer.  Used by metrics system at least.
@@ -136,7 +135,9 @@ public interface WALActionsListener {
     public void visitLogEntryBeforeWrite(HRegionInfo info, WALKey logKey, WALEdit logEdit) {}
 
     @Override
-    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit) {}
+    public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+        throws IOException {
+    }
 
     @Override
     public void postAppend(final long entryLen, final long elapsedTimeMillis) {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 3501f3e..f97ec15 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -18,13 +18,21 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.NavigableMap;
 
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
 /**
@@ -32,6 +40,7 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
  */
 @InterfaceAudience.Private
 public class ScopeWALEntryFilter implements WALEntryFilter {
+  private static final Log LOG = LogFactory.getLog(ScopeWALEntryFilter.class);
 
   @Override
   public Entry filter(Entry entry) {
@@ -41,13 +50,27 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
     }
     ArrayList<Cell> cells = entry.getEdit().getCells();
     int size = cells.size();
+    byte[] fam;
     for (int i = size - 1; i >= 0; i--) {
       Cell cell = cells.get(i);
-      // The scope will be null or empty if
-      // there's nothing to replicate in that WALEdit
-      byte[] fam = CellUtil.cloneFamily(cell);
-      if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
-        cells.remove(i);
+      // If a bulk load entry has a scope then that means user has enabled replication for bulk load
+      // hfiles.
+      // TODO There is a similar logic in TableCfWALEntryFilter but data structures are different so
+      // cannot refactor into one now, can revisit and see if any way to unify them.
+      if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+        Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(scopes, cell);
+        if (filteredBulkLoadEntryCell != null) {
+          cells.set(i, filteredBulkLoadEntryCell);
+        } else {
+          cells.remove(i);
+        }
+      } else {
+        // The scope will be null or empty if
+        // there's nothing to replicate in that WALEdit
+        fam = CellUtil.cloneFamily(cell);
+        if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+          cells.remove(i);
+        }
       }
     }
     if (cells.size() < size / 2) {
@@ -56,4 +79,41 @@ public class ScopeWALEntryFilter implements WALEntryFilter {
     return entry;
   }
 
+  private Cell filterBulkLoadEntries(NavigableMap<byte[], Integer> scopes, Cell cell) {
+    byte[] fam;
+    BulkLoadDescriptor bld = null;
+    try {
+      bld = WALEdit.getBulkLoadDescriptor(cell);
+    } catch (IOException e) {
+      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+      return cell;
+    }
+    List<StoreDescriptor> storesList = bld.getStoresList();
+    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+    List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+    boolean anyStoreRemoved = false;
+    while (copiedStoresListIterator.hasNext()) {
+      StoreDescriptor sd = copiedStoresListIterator.next();
+      fam = sd.getFamilyName().toByteArray();
+      if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
+        copiedStoresListIterator.remove();
+        anyStoreRemoved = true;
+      }
+    }
+
+    if (!anyStoreRemoved) {
+      return cell;
+    } else if (copiedStoresList.isEmpty()) {
+      return null;
+    }
+    BulkLoadDescriptor.Builder newDesc =
+        BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+            .setEncodedRegionName(bld.getEncodedRegionName())
+            .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+    newDesc.addAllStores(copiedStoresList);
+    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+    return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+      cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
index 642ee8a..f10849b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/TableCfWALEntryFilter.java
@@ -18,14 +18,20 @@
 
 package org.apache.hadoop.hbase.replication;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 
@@ -52,19 +58,36 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
     }
     int size = cells.size();
 
+    // If null means user has explicitly not configured any table CFs so all the tables data are
+    // applicable for replication
+    if (tableCFs == null) {
+      return entry;
+    }
     // return null(prevent replicating) if logKey's table isn't in this peer's
-    // replicable table list (empty tableCFs means all table are replicable)
-    if (tableCFs != null && !tableCFs.containsKey(tabName)) {
+    // replicable table list
+    if (!tableCFs.containsKey(tabName)) {
       return null;
     } else {
-      List<String> cfs = (tableCFs == null) ? null : tableCFs.get(tabName);
+      List<String> cfs = tableCFs.get(tabName);
       for (int i = size - 1; i >= 0; i--) {
         Cell cell = cells.get(i);
-        // ignore(remove) kv if its cf isn't in the replicable cf list
-        // (empty cfs means all cfs of this table are replicable)
-        if ((cfs != null) && !cfs.contains(Bytes.toString(
-            cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength()))) {
-          cells.remove(i);
+        // TODO There is a similar logic in ScopeWALEntryFilter but data structures are different so
+        // cannot refactor into one now, can revisit and see if any way to unify them.
+        // Filter bulk load entries separately
+        if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
+          Cell filteredBulkLoadEntryCell = filterBulkLoadEntries(cfs, cell);
+          if (filteredBulkLoadEntryCell != null) {
+            cells.set(i, filteredBulkLoadEntryCell);
+          } else {
+            cells.remove(i);
+          }
+        } else {
+          // ignore(remove) kv if its cf isn't in the replicable cf list
+          // (empty cfs means all cfs of this table are replicable)
+          if ((cfs != null) && !cfs.contains(Bytes.toString(cell.getFamilyArray(),
+            cell.getFamilyOffset(), cell.getFamilyLength()))) {
+            cells.remove(i);
+          }
         }
       }
     }
@@ -74,4 +97,41 @@ public class TableCfWALEntryFilter implements WALEntryFilter {
     return entry;
   }
 
+  private Cell filterBulkLoadEntries(List<String> cfs, Cell cell) {
+    byte[] fam;
+    BulkLoadDescriptor bld = null;
+    try {
+      bld = WALEdit.getBulkLoadDescriptor(cell);
+    } catch (IOException e) {
+      LOG.warn("Failed to get bulk load events information from the WAL file.", e);
+      return cell;
+    }
+    List<StoreDescriptor> storesList = bld.getStoresList();
+    // Copy the StoreDescriptor list and update it as storesList is a unmodifiableList
+    List<StoreDescriptor> copiedStoresList = new ArrayList<StoreDescriptor>(storesList);
+    Iterator<StoreDescriptor> copiedStoresListIterator = copiedStoresList.iterator();
+    boolean anyStoreRemoved = false;
+    while (copiedStoresListIterator.hasNext()) {
+      StoreDescriptor sd = copiedStoresListIterator.next();
+      fam = sd.getFamilyName().toByteArray();
+      if (cfs != null && !cfs.contains(Bytes.toString(fam))) {
+        copiedStoresListIterator.remove();
+        anyStoreRemoved = true;
+      }
+    }
+
+    if (!anyStoreRemoved) {
+      return cell;
+    } else if (copiedStoresList.isEmpty()) {
+      return null;
+    }
+    BulkLoadDescriptor.Builder newDesc =
+        BulkLoadDescriptor.newBuilder().setTableName(bld.getTableName())
+            .setEncodedRegionName(bld.getEncodedRegionName())
+            .setBulkloadSeqNum(bld.getBulkloadSeqNum());
+    newDesc.addAllStores(copiedStoresList);
+    BulkLoadDescriptor newBulkLoadDescriptor = newDesc.build();
+    return CellUtil.createCell(CellUtil.cloneRow(cell), WALEdit.METAFAMILY, WALEdit.BULK_LOAD,
+      cell.getTimestamp(), cell.getTypeByte(), newBulkLoadDescriptor.toByteArray());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
new file mode 100644
index 0000000..9bfea4b
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationHFileCleaner.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.master;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ZooKeeperConnectionException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
+import org.apache.hadoop.hbase.replication.ReplicationFactory;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * Implementation of a file cleaner that checks if a hfile is still scheduled for replication before
+ * deleting it from hfile archive directory.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
+public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate implements Abortable {
+  private static final Log LOG = LogFactory.getLog(ReplicationHFileCleaner.class);
+  private ZooKeeperWatcher zkw;
+  private ReplicationQueuesClient rqc;
+  private boolean stopped = false;
+  private boolean aborted;
+
+  @Override
+  public Iterable<FileStatus> getDeletableFiles(Iterable<FileStatus> files) {
+    // all members of this class are null if replication is disabled,
+    // so we cannot filter the files
+    if (this.getConf() == null) {
+      return files;
+    }
+
+    final Set<String> hfileRefs;
+    try {
+      // The concurrently created new hfile entries in ZK may not be included in the return list,
+      // but they won't be deleted because they're not in the checking set.
+      hfileRefs = loadHFileRefsFromPeers();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
+      return Collections.emptyList();
+    }
+    return Iterables.filter(files, new Predicate<FileStatus>() {
+      @Override
+      public boolean apply(FileStatus file) {
+        String hfile = file.getPath().getName();
+        boolean foundHFileRefInQueue = hfileRefs.contains(hfile);
+        if (LOG.isDebugEnabled()) {
+          if (foundHFileRefInQueue) {
+            LOG.debug("Found hfile reference in ZK, keeping: " + hfile);
+          } else {
+            LOG.debug("Did not find hfile reference in ZK, deleting: " + hfile);
+          }
+        }
+        return !foundHFileRefInQueue;
+      }
+    });
+  }
+
+  /**
+   * Load all hfile references in all replication queues from ZK. This method guarantees to return a
+   * snapshot which contains all hfile references in the zookeeper at the start of this call.
+   * However, some newly created hfile references during the call may not be included.
+   */
+  private Set<String> loadHFileRefsFromPeers() throws KeeperException {
+    Set<String> hfileRefs = Sets.newHashSet();
+    List<String> listOfPeers;
+    for (int retry = 0;; retry++) {
+      int v0 = rqc.getHFileRefsNodeChangeVersion();
+      hfileRefs.clear();
+      listOfPeers = rqc.getAllPeersFromHFileRefsQueue();
+      if (listOfPeers == null) {
+        LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
+        return ImmutableSet.of();
+      }
+      for (String id : listOfPeers) {
+        List<String> peerHFileRefs = rqc.getReplicableHFiles(id);
+        if (peerHFileRefs != null) {
+          hfileRefs.addAll(peerHFileRefs);
+        }
+      }
+      int v1 = rqc.getHFileRefsNodeChangeVersion();
+      if (v0 == v1) {
+        return hfileRefs;
+      }
+      LOG.debug(String.format("Replication hfile references node cversion changed from "
+          + "%d to %d, retry = %d", v0, v1, retry));
+    }
+  }
+
+  @Override
+  public void setConf(Configuration config) {
+    // If either replication or replication of bulk load hfiles is disabled, keep all members null
+    if (!(config.getBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT) && config.getBoolean(
+      HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT))) {
+      LOG.warn(HConstants.REPLICATION_ENABLE_KEY
+          + " is not enabled so allowing all hfile references to be deleted. Better to remove "
+          + ReplicationHFileCleaner.class + " from " + HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS
+          + " configuration.");
+      return;
+    }
+    // Make my own Configuration. Then I'll have my own connection to zk that
+    // I can close myself when time comes.
+    Configuration conf = new Configuration(config);
+    super.setConf(conf);
+    try {
+      initReplicationQueuesClient(conf);
+    } catch (IOException e) {
+      LOG.error("Error while configuring " + this.getClass().getName(), e);
+    }
+  }
+
+  private void initReplicationQueuesClient(Configuration conf)
+      throws ZooKeeperConnectionException, IOException {
+    this.zkw = new ZooKeeperWatcher(conf, "replicationHFileCleaner", null);
+    this.rqc = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
+  }
+
+  @Override
+  public void stop(String why) {
+    if (this.stopped) {
+      return;
+    }
+    this.stopped = true;
+    if (this.zkw != null) {
+      LOG.info("Stopping " + this.zkw);
+      this.zkw.close();
+    }
+  }
+
+  @Override
+  public boolean isStopped() {
+    return this.stopped;
+  }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.warn("Aborting ReplicationHFileCleaner because " + why, e);
+    this.aborted = true;
+    stop(why);
+  }
+
+  @Override
+  public boolean isAborted() {
+    return this.aborted;
+  }
+
+  @Override
+  public boolean isFileDeletable(FileStatus fStat) {
+    Set<String> hfileRefsFromQueue;
+    // all members of this class are null if replication is disabled,
+    // so do not stop from deleting the file
+    if (getConf() == null) {
+      return true;
+    }
+
+    try {
+      hfileRefsFromQueue = loadHFileRefsFromPeers();
+    } catch (KeeperException e) {
+      LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
+          + "file for " + fStat.getPath());
+      return false;
+    }
+    return !hfileRefsFromQueue.contains(fStat.getPath().getName());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
new file mode 100644
index 0000000..8d5c6d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DefaultSourceFSConfigurationProvider.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This will load all the xml configuration files for the source cluster replication ID from
+ * user configured replication configuration directory.
+ */
+@InterfaceAudience.Private
+public class DefaultSourceFSConfigurationProvider implements SourceFSConfigurationProvider {
+  private static final Log LOG = LogFactory.getLog(DefaultSourceFSConfigurationProvider.class);
+  // Map containing all the source clusters configurations against their replication cluster id
+  private Map<String, Configuration> sourceClustersConfs = new HashMap<>();
+  private static final String XML = ".xml";
+
+  @Override
+  public Configuration getConf(Configuration sinkConf, String replicationClusterId)
+      throws IOException {
+    if (sourceClustersConfs.get(replicationClusterId) == null) {
+      synchronized (this.sourceClustersConfs) {
+        if (sourceClustersConfs.get(replicationClusterId) == null) {
+          LOG.info("Loading source cluster FS client conf for cluster " + replicationClusterId);
+          // Load only user provided client configurations.
+          Configuration sourceClusterConf = new Configuration(false);
+
+          String replicationConfDir = sinkConf.get(HConstants.REPLICATION_CONF_DIR);
+          if (replicationConfDir == null) {
+            LOG.debug(HConstants.REPLICATION_CONF_DIR + " is not configured.");
+            URL resource = HBaseConfiguration.class.getClassLoader().getResource("hbase-site.xml");
+            if (resource != null) {
+              String path = resource.getPath();
+              replicationConfDir = path.substring(0, path.lastIndexOf("/"));
+            } else {
+              replicationConfDir = System.getenv("HBASE_CONF_DIR");
+            }
+          }
+
+          LOG.info("Loading source cluster " + replicationClusterId
+              + " file system configurations from xml files under directory " + replicationConfDir);
+          File confDir = new File(replicationConfDir, replicationClusterId);
+          String[] listofConfFiles = FileUtil.list(confDir);
+          for (String confFile : listofConfFiles) {
+            if (new File(confDir, confFile).isFile() && confFile.endsWith(XML)) {
+              // Add all the user provided client conf files
+              sourceClusterConf.addResource(new Path(confDir.getPath(), confFile));
+            }
+          }
+          this.sourceClustersConfs.put(replicationClusterId, sourceClusterConf);
+        }
+      }
+    }
+    return this.sourceClustersConfs.get(replicationClusterId);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 7c07ecc..d51d512 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -37,24 +37,26 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.ipc.RemoteException;
 
 /**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} 
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
  * implementation for replicating to another HBase cluster.
  * For the slave cluster it selects a random number of peers
  * using a replication ratio. For example, if replication ration = 0.1
@@ -84,8 +86,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   // Handles connecting to peer region servers
   private ReplicationSinkManager replicationSinkMgr;
   private boolean peersSelected = false;
+  private String replicationClusterId = "";
   private ThreadPoolExecutor exec;
   private int maxThreads;
+  private Path baseNamespaceDir;
+  private Path hfileArchiveDir;
+  private boolean replicationBulkLoadDataEnabled;
 
   @Override
   public void init(Context context) throws IOException {
@@ -108,7 +114,19 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
     this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
-      new SynchronousQueue<Runnable>());
+        new SynchronousQueue<Runnable>());
+
+    this.replicationBulkLoadDataEnabled =
+        conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+          HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+    if (this.replicationBulkLoadDataEnabled) {
+      replicationClusterId = this.conf.get(HConstants.REPLICATION_CLUSTER_ID);
+    }
+    // Construct base namespace directory and hfile archive directory path
+    Path rootDir = FSUtils.getRootDir(conf);
+    Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
+    baseNamespaceDir = new Path(rootDir, baseNSDir);
+    hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
   }
 
   private void decorateConf() {
@@ -317,8 +335,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       try {
         sinkPeer = replicationSinkMgr.getReplicationSink();
         BlockingInterface rrs = sinkPeer.getRegionServer();
-        ReplicationProtbufUtil.replicateWALEntry(rrs,
-            entries.toArray(new Entry[entries.size()]));
+        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
+          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
         replicationSinkMgr.reportSinkSuccess(sinkPeer);
         return ordinal;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
new file mode 100644
index 0000000..17f6780
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
+ * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
+ * for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.math.BigInteger;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+
+/**
+ * It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
+ * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of
+ * {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
+ */
+@InterfaceAudience.Private
+public class HFileReplicator {
+  /** Maximum number of threads to allow in pool to copy hfiles during replication */
+  public static final String REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY =
+      "hbase.replication.bulkload.copy.maxthreads";
+  public static final int REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT = 10;
+  /** Number of hfiles to copy per thread during replication */
+  public static final String REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY =
+      "hbase.replication.bulkload.copy.hfiles.perthread";
+  public static final int REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT = 10;
+
+  private static final Log LOG = LogFactory.getLog(HFileReplicator.class);
+  private final String UNDERSCORE = "_";
+  private final static FsPermission PERM_ALL_ACCESS = FsPermission.valueOf("-rwxrwxrwx");
+
+  private Configuration sourceClusterConf;
+  private String sourceBaseNamespaceDirPath;
+  private String sourceHFileArchiveDirPath;
+  private Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap;
+  private FileSystem sinkFs;
+  private FsDelegationToken fsDelegationToken;
+  private UserProvider userProvider;
+  private Configuration conf;
+  private Connection connection;
+  private String hbaseStagingDir;
+  private ThreadPoolExecutor exec;
+  private int maxCopyThreads;
+  private int copiesPerThread;
+
+  public HFileReplicator(Configuration sourceClusterConf,
+      String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
+      Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
+      Connection connection) throws IOException {
+    this.sourceClusterConf = sourceClusterConf;
+    this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
+    this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
+    this.bulkLoadHFileMap = tableQueueMap;
+    this.conf = conf;
+    this.connection = connection;
+
+    userProvider = UserProvider.instantiate(conf);
+    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+    this.hbaseStagingDir = conf.get("hbase.bulkload.staging.dir");
+    this.maxCopyThreads =
+        this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
+          REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
+    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
+    builder.setNameFormat("HFileReplicationCallable-%1$d");
+    this.exec =
+        new ThreadPoolExecutor(1, maxCopyThreads, 60, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(), builder.build());
+    this.exec.allowCoreThreadTimeOut(true);
+    this.copiesPerThread =
+        conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
+          REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
+
+    sinkFs = FileSystem.get(conf);
+  }
+
+  public Void replicate() throws IOException {
+    // Copy all the hfiles to the local file system
+    Map<String, Path> tableStagingDirsMap = copyHFilesToStagingDir();
+
+    int maxRetries = conf.getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+
+    for (Entry<String, Path> tableStagingDir : tableStagingDirsMap.entrySet()) {
+      String tableNameString = tableStagingDir.getKey();
+      Path stagingDir = tableStagingDir.getValue();
+
+      LoadIncrementalHFiles loadHFiles = null;
+      try {
+        loadHFiles = new LoadIncrementalHFiles(conf);
+      } catch (Exception e) {
+        LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
+            + " data.", e);
+        throw new IOException(e);
+      }
+      Configuration newConf = HBaseConfiguration.create(conf);
+      newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
+      loadHFiles.setConf(newConf);
+
+      TableName tableName = TableName.valueOf(tableNameString);
+      Table table = this.connection.getTable(tableName);
+
+      // Prepare collection of queue of hfiles to be loaded(replicated)
+      Deque<LoadQueueItem> queue = new LinkedList<LoadQueueItem>();
+      loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
+
+      if (queue.isEmpty()) {
+        LOG.warn("Replication process did not find any files to replicate in directory "
+            + stagingDir.toUri());
+        return null;
+      }
+
+      try (RegionLocator locator = connection.getRegionLocator(tableName)) {
+
+        fsDelegationToken.acquireDelegationToken(sinkFs);
+
+        // Set the staging directory which will be used by LoadIncrementalHFiles for loading the
+        // data
+        loadHFiles.setBulkToken(stagingDir.toString());
+
+        doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
+      } finally {
+        cleanup(stagingDir.toString(), table);
+      }
+    }
+    return null;
+  }
+
+  private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table,
+      Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
+    int count = 0;
+    Pair<byte[][], byte[][]> startEndKeys;
+    while (!queue.isEmpty()) {
+      // need to reload split keys each iteration.
+      startEndKeys = locator.getStartEndKeys();
+      if (count != 0) {
+        LOG.warn("Error occured while replicating HFiles, retry attempt " + count + " with "
+            + queue.size() + " files still remaining to replicate.");
+      }
+
+      if (maxRetries != 0 && count >= maxRetries) {
+        throw new IOException("Retry attempted " + count
+            + " times without completing, bailing out.");
+      }
+      count++;
+
+      // Try bulk load
+      loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
+    }
+  }
+
+  private void cleanup(String stagingDir, Table table) {
+    // Release the file system delegation token
+    fsDelegationToken.releaseDelegationToken();
+    // Delete the staging directory
+    if (stagingDir != null) {
+      try {
+        sinkFs.delete(new Path(stagingDir), true);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete the staging directory " + stagingDir, e);
+      }
+    }
+    // Do not close the file system
+
+    /*
+     * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn(
+     * "Failed to close the file system"); } }
+     */
+
+    // Close the table
+    if (table != null) {
+      try {
+        table.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close the table.", e);
+      }
+    }
+  }
+
+  private Map<String, Path> copyHFilesToStagingDir() throws IOException {
+    Map<String, Path> mapOfCopiedHFiles = new HashMap<String, Path>();
+    Pair<byte[], List<String>> familyHFilePathsPair;
+    List<String> hfilePaths;
+    byte[] family;
+    Path familyStagingDir;
+    int familyHFilePathsPairsListSize;
+    int totalNoOfHFiles;
+    List<Pair<byte[], List<String>>> familyHFilePathsPairsList;
+    FileSystem sourceFs = null;
+
+    try {
+      Path sourceClusterPath = new Path(sourceBaseNamespaceDirPath);
+      /*
+       * Path#getFileSystem will by default get the FS from cache. If both source and sink cluster
+       * has same FS name service then it will return peer cluster FS. To avoid this we explicitly
+       * disable the loading of FS from cache, so that a new FS is created with source cluster
+       * configuration.
+       */
+      String sourceScheme = sourceClusterPath.toUri().getScheme();
+      String disableCacheName =
+          String.format("fs.%s.impl.disable.cache", new Object[] { sourceScheme });
+      sourceClusterConf.setBoolean(disableCacheName, true);
+
+      sourceFs = sourceClusterPath.getFileSystem(sourceClusterConf);
+
+      User user = userProvider.getCurrent();
+      // For each table name in the map
+      for (Entry<String, List<Pair<byte[], List<String>>>> tableEntry : bulkLoadHFileMap
+          .entrySet()) {
+        String tableName = tableEntry.getKey();
+
+        // Create staging directory for each table
+        Path stagingDir =
+            createStagingDir(new Path(hbaseStagingDir), user, TableName.valueOf(tableName));
+
+        familyHFilePathsPairsList = tableEntry.getValue();
+        familyHFilePathsPairsListSize = familyHFilePathsPairsList.size();
+
+        // For each list of family hfile paths pair in the table
+        for (int i = 0; i < familyHFilePathsPairsListSize; i++) {
+          familyHFilePathsPair = familyHFilePathsPairsList.get(i);
+
+          family = familyHFilePathsPair.getFirst();
+          hfilePaths = familyHFilePathsPair.getSecond();
+
+          familyStagingDir = new Path(stagingDir, Bytes.toString(family));
+          totalNoOfHFiles = hfilePaths.size();
+
+          // For each list of hfile paths for the family
+          List<Future<Void>> futures = new ArrayList<Future<Void>>();
+          Callable<Void> c;
+          Future<Void> future;
+          int currentCopied = 0;
+          // Copy the hfiles parallely
+          while (totalNoOfHFiles > currentCopied + this.copiesPerThread) {
+            c =
+                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
+                  currentCopied + this.copiesPerThread));
+            future = exec.submit(c);
+            futures.add(future);
+            currentCopied += this.copiesPerThread;
+          }
+
+          int remaining = totalNoOfHFiles - currentCopied;
+          if (remaining > 0) {
+            c =
+                new Copier(sourceFs, familyStagingDir, hfilePaths.subList(currentCopied,
+                  currentCopied + remaining));
+            future = exec.submit(c);
+            futures.add(future);
+          }
+
+          for (Future<Void> f : futures) {
+            try {
+              f.get();
+            } catch (InterruptedException e) {
+              InterruptedIOException iioe =
+                  new InterruptedIOException(
+                      "Failed to copy HFiles to local file system. This will be retried again "
+                          + "by the source cluster.");
+              iioe.initCause(e);
+              throw iioe;
+            } catch (ExecutionException e) {
+              throw new IOException("Failed to copy HFiles to local file system. This will "
+                  + "be retried again by the source cluster.", e);
+            }
+          }
+        }
+        // Add the staging directory to this table. Staging directory contains all the hfiles
+        // belonging to this table
+        mapOfCopiedHFiles.put(tableName, stagingDir);
+      }
+      return mapOfCopiedHFiles;
+    } finally {
+      if (sourceFs != null) {
+        sourceFs.close();
+      }
+      if(exec != null) {
+        exec.shutdown();
+      }
+    }
+  }
+
+  private Path createStagingDir(Path baseDir, User user, TableName tableName) throws IOException {
+    String tblName = tableName.getNameAsString().replace(":", UNDERSCORE);
+    int RANDOM_WIDTH = 320;
+    int RANDOM_RADIX = 32;
+    String doubleUnderScore = UNDERSCORE + UNDERSCORE;
+    String randomDir = user.getShortName() + doubleUnderScore + tblName + doubleUnderScore
+        + (new BigInteger(RANDOM_WIDTH, new SecureRandom()).toString(RANDOM_RADIX));
+    return createStagingDir(baseDir, user, randomDir);
+  }
+
+  private Path createStagingDir(Path baseDir, User user, String randomDir) throws IOException {
+    Path p = new Path(baseDir, randomDir);
+    sinkFs.mkdirs(p, PERM_ALL_ACCESS);
+    sinkFs.setPermission(p, PERM_ALL_ACCESS);
+    return p;
+  }
+
+  /**
+   * This class will copy the given hfiles from the given source file system to the given local file
+   * system staging directory.
+   */
+  private class Copier implements Callable<Void> {
+    private FileSystem sourceFs;
+    private Path stagingDir;
+    private List<String> hfiles;
+
+    public Copier(FileSystem sourceFs, final Path stagingDir, final List<String> hfiles)
+        throws IOException {
+      this.sourceFs = sourceFs;
+      this.stagingDir = stagingDir;
+      this.hfiles = hfiles;
+    }
+
+    @Override
+    public Void call() throws IOException {
+      Path sourceHFilePath;
+      Path localHFilePath;
+      int totalHFiles = hfiles.size();
+      for (int i = 0; i < totalHFiles; i++) {
+        sourceHFilePath = new Path(sourceBaseNamespaceDirPath, hfiles.get(i));
+        localHFilePath = new Path(stagingDir, sourceHFilePath.getName());
+        try {
+          FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
+          // If any other exception other than FNFE then we will fail the replication requests and
+          // source will retry to replicate these data.
+        } catch (FileNotFoundException e) {
+          LOG.info("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+              + ". Trying to copy from hfile archive directory.",
+            e);
+          sourceHFilePath = new Path(sourceHFileArchiveDirPath, hfiles.get(i));
+
+          try {
+            FileUtil.copy(sourceFs, sourceHFilePath, sinkFs, localHFilePath, false, conf);
+          } catch (FileNotFoundException e1) {
+            // This will mean that the hfile does not exists any where in source cluster FS. So we
+            // cannot do anything here just log and return.
+            LOG.error("Failed to copy hfile from " + sourceHFilePath + " to " + localHFilePath
+                + ". Hence ignoring this hfile from replication..",
+              e1);
+            return null;
+          }
+        }
+        sinkFs.setPermission(localHFilePath, PERM_ALL_ACCESS);
+      }
+      return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index 37dc1dd..f308daf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -47,7 +47,7 @@ public class MetricsSink {
     if (lastTimestampForAge != timestamp) {
       lastTimestampForAge = timestamp;
       age = System.currentTimeMillis() - lastTimestampForAge;
-    } 
+    }
     mss.setLastAppliedOpAge(age);
     return age;
   }
@@ -72,6 +72,17 @@ public class MetricsSink {
   }
 
   /**
+   * Convience method to change metrics when a batch of operations are applied.
+   *
+   * @param batchSize total number of mutations that are applied/replicated
+   * @param hfileSize total number of hfiles that are applied/replicated
+   */
+  public void applyBatch(long batchSize, long hfileSize) {
+    applyBatch(batchSize);
+    mss.incrAppliedHFiles(hfileSize);
+  }
+
+  /**
    * Get the Age of Last Applied Op
    * @return ageOfLastAppliedOp
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index f9f7001..9687af7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -40,11 +40,13 @@ public class MetricsSource {
   // tracks last shipped timestamp for each wal group
   private Map<String, Long> lastTimeStamps = new HashMap<String, Long>();
   private int lastQueueSize = 0;
+  private long lastHFileRefsQueueSize = 0;
   private String id;
 
   private final MetricsReplicationSourceSource singleSourceSource;
   private final MetricsReplicationSourceSource globalSourceSource;
 
+
   /**
    * Constructor used to register the metrics
    *
@@ -143,6 +145,18 @@ public class MetricsSource {
     globalSourceSource.incrShippedKBs(sizeInKB);
   }
 
+  /**
+   * Convience method to apply changes to metrics do to shipping a batch of logs.
+   *
+   * @param batchSize the size of the batch that was shipped to sinks.
+   * @param hfiles total number of hfiles shipped to sinks.
+   */
+  public void shipBatch(long batchSize, int sizeInKB, long hfiles) {
+    shipBatch(batchSize, sizeInKB);
+    singleSourceSource.incrHFilesShipped(hfiles);
+    globalSourceSource.incrHFilesShipped(hfiles);
+  }
+
   /** increase the byte number read by source from log file */
   public void incrLogReadInBytes(long readInBytes) {
     singleSourceSource.incrLogReadInBytes(readInBytes);
@@ -153,8 +167,10 @@ public class MetricsSource {
   public void clear() {
     singleSourceSource.clear();
     globalSourceSource.decrSizeOfLogQueue(lastQueueSize);
+    globalSourceSource.decrSizeOfHFileRefsQueue(lastHFileRefsQueueSize);
     lastTimeStamps.clear();
     lastQueueSize = 0;
+    lastHFileRefsQueueSize = 0;
   }
 
   /**
@@ -194,4 +210,19 @@ public class MetricsSource {
   public String getPeerID() {
     return id;
   }
+
+  public void incrSizeOfHFileRefsQueue(long size) {
+    singleSourceSource.incrSizeOfHFileRefsQueue(size);
+    globalSourceSource.incrSizeOfHFileRefsQueue(size);
+    lastHFileRefsQueueSize = size;
+  }
+
+  public void decrSizeOfHFileRefsQueue(int size) {
+    singleSourceSource.decrSizeOfHFileRefsQueue(size);
+    globalSourceSource.decrSizeOfHFileRefsQueue(size);
+    lastHFileRefsQueueSize -= size;
+    if (lastHFileRefsQueueSize < 0) {
+      lastHFileRefsQueueSize = 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index b3db0f6..30153f8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -649,8 +649,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
 
         // set the region name for the target region replica
         Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
-            ReplicationProtbufUtil.buildReplicateWALEntryRequest(
-              entriesArray, location.getRegionInfo().getEncodedNameAsBytes());
+            ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
+                .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
         try {
           PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
           controller.setCallTimeout(timeout);

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index b396dfc..d2a0776 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -44,7 +45,10 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -55,6 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationTracker;
+import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
 import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -71,6 +76,7 @@ public class Replication extends WALActionsListener.Base implements
   private static final Log LOG =
       LogFactory.getLog(Replication.class);
   private boolean replication;
+  private boolean replicationForBulkLoadData;
   private ReplicationSourceManager replicationManager;
   private ReplicationQueues replicationQueues;
   private ReplicationPeers replicationPeers;
@@ -84,7 +90,6 @@ public class Replication extends WALActionsListener.Base implements
   private int statsThreadPeriod;
   // ReplicationLoad to access replication metrics
   private ReplicationLoad replicationLoad;
-
   /**
    * Instantiate the replication management (if rep is enabled).
    * @param server Hosting server
@@ -109,11 +114,20 @@ public class Replication extends WALActionsListener.Base implements
     this.server = server;
     this.conf = this.server.getConfiguration();
     this.replication = isReplication(this.conf);
+    this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
       new ThreadFactoryBuilder()
         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
         .setDaemon(true)
         .build());
+    if (this.replicationForBulkLoadData) {
+      if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
+          || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
+        throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
+            + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
+            + " is set to true.");
+      }
+    }
     if (replication) {
       try {
         this.replicationQueues =
@@ -158,6 +172,15 @@ public class Replication extends WALActionsListener.Base implements
     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
   }
 
+  /**
+   * @param c Configuration to look at
+   * @return True if replication for bulk load data is enabled.
+   */
+  public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
+    return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
+      HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
+  }
+
    /*
     * Returns an object to listen to new wal changes
     **/
@@ -187,14 +210,22 @@ public class Replication extends WALActionsListener.Base implements
   /**
    * Carry on the list of log entries down to the sink
    * @param entries list of entries to replicate
-   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
-   * do not contain the Cells we are replicating; they are passed here on the side in this
-   * CellScanner).
+   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
+   *          contain the Cells we are replicating; they are passed here on the side in this
+   *          CellScanner).
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+   *          directory required for replicating hfiles
+   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
    * @throws IOException
    */
-  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
+  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
+      String replicationClusterId, String sourceBaseNamespaceDirPath,
+      String sourceHFileArchiveDirPath) throws IOException {
     if (this.replication) {
-      this.replicationSink.replicateEntries(entries, cells);
+      this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
+        sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
     }
   }
 
@@ -226,34 +257,44 @@ public class Replication extends WALActionsListener.Base implements
   }
 
   @Override
-  public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
-                                       WALEdit logEdit) {
-    scopeWALEdits(htd, logKey, logEdit);
+  public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey, WALEdit logEdit)
+      throws IOException {
+    scopeWALEdits(htd, logKey, logEdit, this.conf, this.getReplicationManager());
   }
 
   /**
-   * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
-   * from compaction WAL edits and if the scope is local.
+   * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
+   * compaction WAL edits and if the scope is local.
    * @param htd Descriptor used to find the scope to use
    * @param logKey Key that may get scoped according to its edits
    * @param logEdit Edits used to lookup the scopes
+   * @param replicationManager Manager used to add bulk load events hfile references
+   * @throws IOException If failed to parse the WALEdit
    */
-  public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
-                                   WALEdit logEdit) {
-    NavigableMap<byte[], Integer> scopes =
-        new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+  public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey, WALEdit logEdit,
+      Configuration conf, ReplicationSourceManager replicationManager) throws IOException {
+    NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
     byte[] family;
+    boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
     for (Cell cell : logEdit.getCells()) {
-      family = CellUtil.cloneFamily(cell);
-      // This is expected and the KV should not be replicated
-      if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
-      // Unexpected, has a tendency to happen in unit tests
-      assert htd.getFamily(family) != null;
+      if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+        if (replicationForBulkLoadEnabled && CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+          scopeBulkLoadEdits(htd, replicationManager, scopes, logKey.getTablename(), cell);
+        } else {
+          // Skip the flush/compaction/region events
+          continue;
+        }
+      } else {
+        family = CellUtil.cloneFamily(cell);
+        // Unexpected, has a tendency to happen in unit tests
+        assert htd.getFamily(family) != null;
 
-      int scope = htd.getFamily(family).getScope();
-      if (scope != REPLICATION_SCOPE_LOCAL &&
-          !scopes.containsKey(family)) {
-        scopes.put(family, scope);
+        if (!scopes.containsKey(family)) {
+          int scope = htd.getFamily(family).getScope();
+          if (scope != REPLICATION_SCOPE_LOCAL) {
+            scopes.put(family, scope);
+          }
+        }
       }
     }
     if (!scopes.isEmpty()) {
@@ -261,6 +302,40 @@ public class Replication extends WALActionsListener.Base implements
     }
   }
 
+  private static void scopeBulkLoadEdits(HTableDescriptor htd,
+      ReplicationSourceManager replicationManager, NavigableMap<byte[], Integer> scopes,
+      TableName tableName, Cell cell) throws IOException {
+    byte[] family;
+    try {
+      BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+      for (StoreDescriptor s : bld.getStoresList()) {
+        family = s.getFamilyName().toByteArray();
+        if (!scopes.containsKey(family)) {
+          int scope = htd.getFamily(family).getScope();
+          if (scope != REPLICATION_SCOPE_LOCAL) {
+            scopes.put(family, scope);
+            addHFileRefsToQueue(replicationManager, tableName, family, s);
+          }
+        } else {
+          addHFileRefsToQueue(replicationManager, tableName, family, s);
+        }
+      }
+    } catch (IOException e) {
+      LOG.error("Failed to get bulk load events information from the wal file.", e);
+      throw e;
+    }
+  }
+
+  private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
+      TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
+    try {
+      replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
+    } catch (ReplicationException e) {
+      LOG.error("Failed to create hfile references in ZK.", e);
+      throw new IOException(e);
+    }
+  }
+
   @Override
   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
     getReplicationManager().preLogRoll(newPath);
@@ -272,8 +347,7 @@ public class Replication extends WALActionsListener.Base implements
   }
 
   /**
-   * This method modifies the master's configuration in order to inject
-   * replication-related features
+   * This method modifies the master's configuration in order to inject replication-related features
    * @param conf
    */
   public static void decorateMasterConfiguration(Configuration conf) {
@@ -285,6 +359,13 @@ public class Replication extends WALActionsListener.Base implements
     if (!plugins.contains(cleanerClass)) {
       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
     }
+    if (isReplicationForBulkLoadDataEnabled(conf)) {
+      plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
+      cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
+      if (!plugins.contains(cleanerClass)) {
+        conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
+      }
+    }
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index f10f5e3..9e7b3af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -33,15 +33,16 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
@@ -51,6 +52,11 @@ import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
 /**
  * <p>
@@ -78,6 +84,9 @@ public class ReplicationSink {
   private final MetricsSink metrics;
   private final AtomicLong totalReplicatedEdits = new AtomicLong();
   private final Object sharedHtableConLock = new Object();
+  // Number of hfiles that we successfully replicated
+  private long hfilesReplicated = 0;
+  private SourceFSConfigurationProvider provider;
 
   /**
    * Create a sink for replication
@@ -91,6 +100,18 @@ public class ReplicationSink {
     this.conf = HBaseConfiguration.create(conf);
     decorateConf();
     this.metrics = new MetricsSink();
+
+    String className =
+        conf.get("hbase.replication.source.fs.conf.provider",
+          DefaultSourceFSConfigurationProvider.class.getCanonicalName());
+    try {
+      @SuppressWarnings("rawtypes")
+      Class c = Class.forName(className);
+      this.provider = (SourceFSConfigurationProvider) c.newInstance();
+    } catch (Exception e) {
+      throw new IllegalArgumentException("Configured source fs configuration provider class "
+          + className + " throws error.", e);
+    }
   }
 
   /**
@@ -113,9 +134,16 @@ public class ReplicationSink {
    * operates against raw protobuf type saving on a conversion from pb to pojo.
    * @param entries
    * @param cells
-   * @throws IOException
+   * @param replicationClusterId Id which will uniquely identify source cluster FS client
+   *          configurations in the replication configuration directory
+   * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
+   *          directory
+   * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
+   * @throws IOException If failed to replicate the data
    */
-  public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
+  public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
+      String replicationClusterId, String sourceBaseNamespaceDirPath,
+      String sourceHFileArchiveDirPath) throws IOException {
     if (entries.isEmpty()) return;
     if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
     // Very simple optimization where we batch sequences of rows going
@@ -126,6 +154,10 @@ public class ReplicationSink {
       // invocation of this method per table and cluster id.
       Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
           new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
+
+      // Map of table name Vs list of pair of family and list of hfile paths from its namespace
+      Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
+
       for (WALEntry entry : entries) {
         TableName table =
             TableName.valueOf(entry.getKey().getTableName().toByteArray());
@@ -138,33 +170,60 @@ public class ReplicationSink {
             throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
           }
           Cell cell = cells.current();
-          if (isNewRowOrType(previousCell, cell)) {
-            // Create new mutation
-            m = CellUtil.isDelete(cell)?
-              new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
-              new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
-            List<UUID> clusterIds = new ArrayList<UUID>();
-            for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
-              clusterIds.add(toUUID(clusterId));
+          // Handle bulk load hfiles replication
+          if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+            if (bulkLoadHFileMap == null) {
+              bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
             }
-            m.setClusterIds(clusterIds);
-            addToHashMultiMap(rowMap, table, clusterIds, m);
-          }
-          if (CellUtil.isDelete(cell)) {
-            ((Delete)m).addDeleteMarker(cell);
+            buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
           } else {
-            ((Put)m).add(cell);
+            // Handle wal replication
+            if (isNewRowOrType(previousCell, cell)) {
+              // Create new mutation
+              m =
+                  CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
+                      cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
+                      cell.getRowLength());
+              List<UUID> clusterIds = new ArrayList<UUID>();
+              for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
+                clusterIds.add(toUUID(clusterId));
+              }
+              m.setClusterIds(clusterIds);
+              addToHashMultiMap(rowMap, table, clusterIds, m);
+            }
+            if (CellUtil.isDelete(cell)) {
+              ((Delete) m).addDeleteMarker(cell);
+            } else {
+              ((Put) m).add(cell);
+            }
+            previousCell = cell;
           }
-          previousCell = cell;
         }
         totalReplicated++;
       }
-      for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
-        batch(entry.getKey(), entry.getValue().values());
+
+      // TODO Replicating mutations and bulk loaded data can be made parallel
+      if (!rowMap.isEmpty()) {
+        LOG.debug("Started replicating mutations.");
+        for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
+          batch(entry.getKey(), entry.getValue().values());
+        }
+        LOG.debug("Finished replicating mutations.");
+      }
+
+      if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
+        LOG.debug("Started replicating bulk loaded data.");
+        HFileReplicator hFileReplicator =
+            new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
+                sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
+                getConnection());
+        hFileReplicator.replicate();
+        LOG.debug("Finished replicating bulk loaded data.");
       }
+
       int size = entries.size();
       this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
-      this.metrics.applyBatch(size);
+      this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
       this.totalReplicatedEdits.addAndGet(totalReplicated);
     } catch (IOException ex) {
       LOG.error("Unable to accept edit because:", ex);
@@ -172,6 +231,76 @@ public class ReplicationSink {
     }
   }
 
+  private void buildBulkLoadHFileMap(
+      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
+      Cell cell) throws IOException {
+    BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+    List<StoreDescriptor> storesList = bld.getStoresList();
+    int storesSize = storesList.size();
+    for (int j = 0; j < storesSize; j++) {
+      StoreDescriptor storeDescriptor = storesList.get(j);
+      List<String> storeFileList = storeDescriptor.getStoreFileList();
+      int storeFilesSize = storeFileList.size();
+      hfilesReplicated += storeFilesSize;
+      for (int k = 0; k < storeFilesSize; k++) {
+        byte[] family = storeDescriptor.getFamilyName().toByteArray();
+
+        // Build hfile relative path from its namespace
+        String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
+
+        String tableName = table.getNameWithNamespaceInclAsString();
+        if (bulkLoadHFileMap.containsKey(tableName)) {
+          List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
+          boolean foundFamily = false;
+          for (int i = 0; i < familyHFilePathsList.size(); i++) {
+            Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i);
+            if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
+              // Found family already present, just add the path to the existing list
+              familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
+              foundFamily = true;
+              break;
+            }
+          }
+          if (!foundFamily) {
+            // Family not found, add this family and its hfile paths pair to the list
+            addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
+          }
+        } else {
+          // Add this table entry into the map
+          addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
+        }
+      }
+    }
+  }
+
+  private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
+      List<Pair<byte[], List<String>>> familyHFilePathsList) {
+    List<String> hfilePaths = new ArrayList<String>();
+    hfilePaths.add(pathToHfileFromNS);
+    familyHFilePathsList.add(new Pair<byte[], List<String>>(family, hfilePaths));
+  }
+
+  private void addNewTableEntryInMap(
+      final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
+      String pathToHfileFromNS, String tableName) {
+    List<String> hfilePaths = new ArrayList<String>();
+    hfilePaths.add(pathToHfileFromNS);
+    Pair<byte[], List<String>> newFamilyHFilePathsPair =
+        new Pair<byte[], List<String>>(family, hfilePaths);
+    List<Pair<byte[], List<String>>> newFamilyHFilePathsList =
+        new ArrayList<Pair<byte[], List<String>>>();
+    newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
+    bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
+  }
+
+  private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
+      byte[] family) {
+    return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
+        .append(table.getQualifierAsString()).append(Path.SEPARATOR)
+        .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
+        .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
+  }
+
   /**
    * @param previousCell
    * @param cell
@@ -241,22 +370,13 @@ public class ReplicationSink {
     }
     Table table = null;
     try {
-      // See https://en.wikipedia.org/wiki/Double-checked_locking
-      Connection connection = this.sharedHtableCon;
-      if (connection == null) {
-        synchronized (sharedHtableConLock) {
-          connection = this.sharedHtableCon;
-          if (connection == null) {
-            connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
-          }
-        }
-      }
+      Connection connection = getConnection();
       table = connection.getTable(tableName);
       for (List<Row> rows : allRows) {
         table.batch(rows, null);
       }
     } catch (InterruptedException ix) {
-      throw (InterruptedIOException)new InterruptedIOException().initCause(ix);
+      throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
     } finally {
       if (table != null) {
         table.close();
@@ -264,6 +384,20 @@ public class ReplicationSink {
     }
   }
 
+  private Connection getConnection() throws IOException {
+    // See https://en.wikipedia.org/wiki/Double-checked_locking
+    Connection connection = sharedHtableCon;
+    if (connection == null) {
+      synchronized (sharedHtableConLock) {
+        connection = sharedHtableCon;
+        if (connection == null) {
+          connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
+        }
+      }
+    }
+    return connection;
+  }
+
   /**
    * Get a string representation of this sink's metrics
    * @return string with the total replicated edits count and the date

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3d99523..868ddee 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -37,7 +37,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -47,9 +46,10 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.wal.DefaultWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
+import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
@@ -59,8 +59,12 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.DefaultWALProvider;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALKey;
 
 import com.google.common.collect.Lists;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -223,6 +227,34 @@ public class ReplicationSource extends Thread
     }
   }
 
+  @Override
+  public void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+      throws ReplicationException {
+    String peerId = peerClusterZnode;
+    if (peerId.contains("-")) {
+      // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+      // A peerId will not have "-" in its name, see HBASE-11394
+      peerId = peerClusterZnode.split("-")[0];
+    }
+    Map<TableName, List<String>> tableCFMap = replicationPeers.getPeer(peerId).getTableCFs();
+    if (tableCFMap != null) {
+      List<String> tableCfs = tableCFMap.get(tableName);
+      if (tableCFMap.containsKey(tableName)
+          && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
+        this.replicationQueues.addHFileRefs(peerId, files);
+        metrics.incrSizeOfHFileRefsQueue(files.size());
+      } else {
+        LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
+            + Bytes.toString(family) + " to peer id " + peerId);
+      }
+    } else {
+      // user has explicitly not defined any table cfs for replication, means replicate all the
+      // data
+      this.replicationQueues.addHFileRefs(peerId, files);
+      metrics.incrSizeOfHFileRefsQueue(files.size());
+    }
+  }
+
   private void uninitialize() {
     LOG.debug("Source exiting " + this.peerId);
     metrics.clear();
@@ -471,6 +503,8 @@ public class ReplicationSource extends Thread
     private int currentSize = 0;
     // Indicates whether this particular worker is running
     private boolean workerRunning = true;
+    // Current number of hfiles that we need to replicate
+    private long currentNbHFiles = 0;
 
     public ReplicationSourceWorkerThread(String walGroupId,
         PriorityBlockingQueue<Path> queue, ReplicationQueueInfo replicationQueueInfo, ReplicationSource source) {
@@ -550,6 +584,7 @@ public class ReplicationSource extends Thread
 
         boolean gotIOE = false;
         currentNbOperations = 0;
+        currentNbHFiles = 0;
         List<WAL.Entry> entries = new ArrayList<WAL.Entry>(1);
         currentSize = 0;
         try {
@@ -701,6 +736,28 @@ public class ReplicationSource extends Thread
       return seenEntries == 0 && processEndOfFile();
     }
 
+    private void cleanUpHFileRefs(WALEdit edit) throws IOException {
+      String peerId = peerClusterZnode;
+      if (peerId.contains("-")) {
+        // peerClusterZnode will be in the form peerId + "-" + rsZNode.
+        // A peerId will not have "-" in its name, see HBASE-11394
+        peerId = peerClusterZnode.split("-")[0];
+      }
+      List<Cell> cells = edit.getCells();
+      for (int i = 0; i < cells.size(); i++) {
+        Cell cell = cells.get(i);
+        if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+          BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
+          List<StoreDescriptor> stores = bld.getStoresList();
+          for (int j = 0; j < stores.size(); j++) {
+            List<String> storeFileList = stores.get(j).getStoreFileList();
+            manager.cleanUpHFileRefs(peerId, storeFileList);
+            metrics.decrSizeOfHFileRefsQueue(storeFileList.size());
+          }
+        }
+      }
+    }
+
     /**
      * Poll for the next path
      * @return true if a path was obtained, false if not
@@ -853,14 +910,31 @@ public class ReplicationSource extends Thread
     private int countDistinctRowKeys(WALEdit edit) {
       List<Cell> cells = edit.getCells();
       int distinctRowKeys = 1;
+      int totalHFileEntries = 0;
       Cell lastCell = cells.get(0);
+
       for (int i = 0; i < edit.size(); i++) {
+        // Count HFiles to be replicated
+        if (CellUtil.matchingQualifier(cells.get(i), WALEdit.BULK_LOAD)) {
+          try {
+            BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cells.get(i));
+            List<StoreDescriptor> stores = bld.getStoresList();
+            for (int j = 0; j < stores.size(); j++) {
+              totalHFileEntries += stores.get(j).getStoreFileList().size();
+            }
+          } catch (IOException e) {
+            LOG.error("Failed to deserialize bulk load entry from wal edit. "
+                + "This its hfiles count will not be added into metric.");
+          }
+        }
+
         if (!CellUtil.matchingRows(cells.get(i), lastCell)) {
           distinctRowKeys++;
         }
         lastCell = cells.get(i);
       }
-      return distinctRowKeys;
+      currentNbHFiles += totalHFileEntries;
+      return distinctRowKeys + totalHFileEntries;
     }
 
     /**
@@ -914,6 +988,12 @@ public class ReplicationSource extends Thread
           }
 
           if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
+            //Clean up hfile references
+            int size = entries.size();
+            for (int i = 0; i < size; i++) {
+              cleanUpHFileRefs(entries.get(i).getEdit());
+            }
+            //Log and clean up WAL logs
             manager.logPositionAndCleanOldLogs(this.currentPath, peerClusterZnode,
               this.repLogReader.getPosition(), this.replicationQueueInfo.isQueueRecovered(),
               currentWALisBeingWrittenTo);
@@ -925,7 +1005,7 @@ public class ReplicationSource extends Thread
           totalReplicatedEdits.addAndGet(entries.size());
           totalReplicatedOperations.addAndGet(currentNbOperations);
           // FIXME check relationship between wal group and overall
-          metrics.shipBatch(currentNbOperations, currentSize / 1024);
+          metrics.shipBatch(currentNbOperations, currentSize / 1024, currentNbHFiles);
           metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
             walGroupId);
           if (LOG.isTraceEnabled()) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26ac60b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 1e9c714..7f4a9f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.UUID;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -26,7 +27,9 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 
@@ -105,4 +108,14 @@ public interface ReplicationSourceInterface {
    */
   String getStats();
 
+  /**
+   * Add hfile names to the queue to be replicated.
+   * @param tableName Name of the table these files belongs to
+   * @param family Name of the family these files belong to
+   * @param files files whose names needs to be added to the queue to be replicated
+   * @throws ReplicationException If failed to add hfile references
+   */
+  void addHFileRefs(TableName tableName, byte[] family, List<String> files)
+      throws ReplicationException;
+
 }