You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/09/03 12:04:42 UTC

[1/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to another package and mark the old one as deprecated

Repository: hbase
Updated Branches:
  refs/heads/master 7c51d3f2e -> 9e53f2927


http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 0fe79d1..199c2c1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -60,13 +60,13 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
new file mode 100644
index 0000000..3ebda29
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/HadoopSecurityEnabledUserProviderForTesting.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.security;
+
+import org.apache.hadoop.hbase.security.UserProvider;
+
+/**
+ * A {@link UserProvider} that always says hadoop security is enabled, regardless of the underlying
+ * configuration. HBase security is <i>not enabled</i> as this is used to determine if SASL is used
+ * to do the authentication, which requires a Kerberos ticket (which we currently don't have in
+ * tests).
+ * <p>
+ * This should only be used for <b>TESTING</b>.
+ */
+public class HadoopSecurityEnabledUserProviderForTesting extends UserProvider {
+
+  @Override
+  public boolean isHBaseSecurityEnabled() {
+    return false;
+  }
+
+  @Override
+  public boolean isHadoopSecurityEnabled() {
+    return true;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 6583366..1e38179 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -25,6 +25,12 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.protobuf.BlockingRpcChannel;
+import com.google.protobuf.RpcCallback;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -93,7 +99,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
 import org.apache.hadoop.hbase.master.locking.LockProcedure;
@@ -118,11 +123,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.Permission.Action;
-import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.SecurityTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.log4j.Level;
@@ -134,11 +137,9 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
-import com.google.protobuf.BlockingRpcChannel;
-import com.google.protobuf.RpcCallback;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.Service;
-import com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProcedureProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos.ProcedureState;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 
 /**
  * Performs authorization checks for common operations, according to different

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
new file mode 100644
index 0000000..3f7d441
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/MapreduceTestingShim.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
+
+/**
+ * This class provides shims for HBase to interact with the Hadoop 1.0.x and the
+ * Hadoop 0.23.x series.
+ *
+ * NOTE: No testing done against 0.22.x, or 0.21.x.
+ */
+abstract public class MapreduceTestingShim {
+  private static MapreduceTestingShim instance;
+  private static Class[] emptyParam = new Class[] {};
+
+  static {
+    try {
+      // This class exists in hadoop 0.22+ but not in Hadoop 20.x/1.x
+      Class c = Class
+          .forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl");
+      instance = new MapreduceV2Shim();
+    } catch (Exception e) {
+      instance = new MapreduceV1Shim();
+    }
+  }
+
+  abstract public JobContext newJobContext(Configuration jobConf)
+      throws IOException;
+
+  abstract public Job newJob(Configuration conf) throws IOException;
+
+  abstract public JobConf obtainJobConf(MiniMRCluster cluster);
+
+  abstract public String obtainMROutputDirProp();
+
+  public static JobContext createJobContext(Configuration jobConf)
+      throws IOException {
+    return instance.newJobContext(jobConf);
+  }
+
+  public static JobConf getJobConf(MiniMRCluster cluster) {
+    return instance.obtainJobConf(cluster);
+  }
+
+  public static Job createJob(Configuration conf) throws IOException {
+    return instance.newJob(conf);
+  }
+
+  public static String getMROutputDirProp() {
+    return instance.obtainMROutputDirProp();
+  }
+
+  private static class MapreduceV1Shim extends MapreduceTestingShim {
+    public JobContext newJobContext(Configuration jobConf) throws IOException {
+      // Implementing:
+      // return new JobContext(jobConf, new JobID());
+      JobID jobId = new JobID();
+      Constructor<JobContext> c;
+      try {
+        c = JobContext.class.getConstructor(Configuration.class, JobID.class);
+        return c.newInstance(jobConf, jobId);
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            "Failed to instantiate new JobContext(jobConf, new JobID())", e);
+      }
+    }
+
+    @Override
+    public Job newJob(Configuration conf) throws IOException {
+      // Implementing:
+      // return new Job(conf);
+      Constructor<Job> c;
+      try {
+        c = Job.class.getConstructor(Configuration.class);
+        return c.newInstance(conf);
+      } catch (Exception e) {
+        throw new IllegalStateException(
+            "Failed to instantiate new Job(conf)", e);
+      }
+    }
+
+    public JobConf obtainJobConf(MiniMRCluster cluster) {
+      if (cluster == null) return null;
+      try {
+        Object runner = cluster.getJobTrackerRunner();
+        Method meth = runner.getClass().getDeclaredMethod("getJobTracker", emptyParam);
+        Object tracker = meth.invoke(runner, new Object []{});
+        Method m = tracker.getClass().getDeclaredMethod("getConf", emptyParam);
+        return (JobConf) m.invoke(tracker, new Object []{});
+      } catch (NoSuchMethodException nsme) {
+        return null;
+      } catch (InvocationTargetException ite) {
+        return null;
+      } catch (IllegalAccessException iae) {
+        return null;
+      }
+    }
+
+    @Override
+    public String obtainMROutputDirProp() {
+      return "mapred.output.dir";
+    }
+  };
+
+  private static class MapreduceV2Shim extends MapreduceTestingShim {
+    public JobContext newJobContext(Configuration jobConf) {
+      return newJob(jobConf);
+    }
+
+    @Override
+    public Job newJob(Configuration jobConf) {
+      // Implementing:
+      // return Job.getInstance(jobConf);
+      try {
+        Method m = Job.class.getMethod("getInstance", Configuration.class);
+        return (Job) m.invoke(null, jobConf); // static method, then arg
+      } catch (Exception e) {
+        e.printStackTrace();
+        throw new IllegalStateException(
+            "Failed to return from Job.getInstance(jobConf)");
+      }
+    }
+
+    public JobConf obtainJobConf(MiniMRCluster cluster) {
+      try {
+        Method meth = MiniMRCluster.class.getMethod("getJobTrackerConf", emptyParam);
+        return (JobConf) meth.invoke(cluster, new Object []{});
+      } catch (NoSuchMethodException nsme) {
+        return null;
+      } catch (InvocationTargetException ite) {
+        return null;
+      } catch (IllegalAccessException iae) {
+        return null;
+      }
+    }
+
+    @Override
+    public String obtainMROutputDirProp() {
+      // This is a copy of o.a.h.mapreduce.lib.output.FileOutputFormat.OUTDIR
+      // from Hadoop 0.23.x.  If we use the source directly we break the hadoop 1.x compile.
+      return "mapreduce.output.fileoutputformat.outputdir";
+    }
+  };
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
new file mode 100644
index 0000000..7e4d40e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
@@ -0,0 +1,723 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+/**
+ * Test cases for the "load" half of the HFileOutputFormat bulk load functionality. These tests run
+ * faster than the full MR cluster tests in TestHFileOutputFormat
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestLoadIncrementalHFiles {
+  @Rule
+  public TestName tn = new TestName();
+
+  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
+  private static final byte[] FAMILY = Bytes.toBytes("myfam");
+  private static final String NAMESPACE = "bulkNS";
+
+  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
+  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
+
+  private static final byte[][] SPLIT_KEYS =
+      new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
+
+  static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+    util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+      MAX_FILES_PER_REGION_PER_FAMILY);
+    // change default behavior so that tag values are returned with normal rpcs
+    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+      KeyValueCodecWithTags.class.getCanonicalName());
+    util.startMiniCluster();
+
+    setupNamespace();
+  }
+
+  protected static void setupNamespace() throws Exception {
+    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  @Test(timeout = 120000)
+  public void testSimpleLoadWithMap() throws Exception {
+    runTest("testSimpleLoadWithMap", BloomType.NONE,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
+      true);
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that fit snugly inside those regions
+   */
+  @Test(timeout = 120000)
+  public void testSimpleLoad() throws Exception {
+    runTest("testSimpleLoad", BloomType.NONE,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
+  }
+
+  @Test(timeout = 120000)
+  public void testSimpleLoadWithFileCopy() throws Exception {
+    String testName = tn.getMethodName();
+    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
+    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
+      false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
+      false, true);
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that cross the boundaries of those regions
+   */
+  @Test(timeout = 120000)
+  public void testRegionCrossingLoad() throws Exception {
+    runTest("testRegionCrossingLoad", BloomType.NONE,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+  }
+
+  /**
+   * Test loading into a column family that has a ROW bloom filter.
+   */
+  @Test(timeout = 60000)
+  public void testRegionCrossingRowBloom() throws Exception {
+    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+  }
+
+  /**
+   * Test loading into a column family that has a ROWCOL bloom filter.
+   */
+  @Test(timeout = 120000)
+  public void testRegionCrossingRowColBloom() throws Exception {
+    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that have different region boundaries than
+   * the table pre-split.
+   */
+  @Test(timeout = 120000)
+  public void testSimpleHFileSplit() throws Exception {
+    runTest("testHFileSplit", BloomType.NONE,
+      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
+          Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
+          new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that cross the boundaries and have
+   * different region boundaries than the table pre-split.
+   */
+  @Test(timeout = 60000)
+  public void testRegionCrossingHFileSplit() throws Exception {
+    testRegionCrossingHFileSplit(BloomType.NONE);
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROW bloom
+   * filter and a different region boundaries than the table pre-split.
+   */
+  @Test(timeout = 120000)
+  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
+    testRegionCrossingHFileSplit(BloomType.ROW);
+  }
+
+  /**
+   * Test case that creates some regions and loads HFiles that cross the boundaries have a ROWCOL
+   * bloom filter and a different region boundaries than the table pre-split.
+   */
+  @Test(timeout = 120000)
+  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
+    testRegionCrossingHFileSplit(BloomType.ROWCOL);
+  }
+
+  @Test
+  public void testSplitALot() throws Exception {
+    runTest("testSplitALot", BloomType.NONE,
+      new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
+          Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
+          Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
+          Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
+          Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
+          Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
+  }
+
+  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
+    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
+      new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
+          Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
+      new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
+          new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+  }
+
+  private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
+    return TableDescriptorBuilder.newBuilder(tableName)
+        .addColumnFamily(
+          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
+        .build();
+  }
+
+  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
+      throws Exception {
+    runTest(testName, bloomType, null, hfileRanges);
+  }
+
+  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges, boolean useMap)
+      throws Exception {
+    runTest(testName, bloomType, null, hfileRanges, useMap);
+  }
+
+  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
+      byte[][][] hfileRanges) throws Exception {
+    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
+  }
+
+  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
+      byte[][][] hfileRanges, boolean useMap) throws Exception {
+    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
+    final boolean preCreateTable = tableSplitKeys != null;
+
+    // Run the test bulkloading the table to the default namespace
+    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
+    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
+      useMap);
+
+    // Run the test bulkloading the table to the specified namespace
+    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
+    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
+      useMap);
+  }
+
+  private void runTest(String testName, TableName tableName, BloomType bloomType,
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
+      throws Exception {
+    TableDescriptor htd = buildHTD(tableName, bloomType);
+    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
+  }
+
+  public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
+      byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
+      byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
+      int initRowCount, int factor) throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(testName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path familyDir = new Path(dir, Bytes.toString(fam));
+
+    int hfileIdx = 0;
+    Map<byte[], List<Path>> map = null;
+    List<Path> list = null;
+    if (useMap || copyFiles) {
+      list = new ArrayList<>();
+    }
+    if (useMap) {
+      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      map.put(fam, list);
+    }
+    Path last = null;
+    for (byte[][] range : hfileRanges) {
+      byte[] from = range[0];
+      byte[] to = range[1];
+      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
+      if (useMap) {
+        last = path;
+        list.add(path);
+      }
+    }
+    int expectedRows = hfileIdx * factor;
+
+    TableName tableName = htd.getTableName();
+    if (!util.getAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
+      util.getAdmin().createTable(htd, tableSplitKeys);
+    }
+
+    Configuration conf = util.getConfiguration();
+    if (copyFiles) {
+      conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+    }
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    String[] args = { dir.toString(), tableName.toString() };
+    if (useMap) {
+      if (deleteFile) {
+        fs.delete(last, true);
+      }
+      Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName);
+      if (deleteFile) {
+        expectedRows -= 1000;
+        for (LoadQueueItem item : loaded.keySet()) {
+          if (item.getFilePath().getName().equals(last.getName())) {
+            fail(last + " should be missing");
+          }
+        }
+      }
+    } else {
+      loader.run(args);
+    }
+
+    if (copyFiles) {
+      for (Path p : list) {
+        assertTrue(p + " should exist", fs.exists(p));
+      }
+    }
+
+    Table table = util.getConnection().getTable(tableName);
+    try {
+      assertEquals(initRowCount + expectedRows, util.countRows(table));
+    } finally {
+      table.close();
+    }
+
+    return expectedRows;
+  }
+
+  private void runTest(String testName, TableDescriptor htd, BloomType bloomType,
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
+      boolean copyFiles) throws Exception {
+    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
+      useMap, true, copyFiles, 0, 1000);
+
+    final TableName tableName = htd.getTableName();
+    // verify staging folder has been cleaned up
+    Path stagingBasePath =
+        new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
+    FileSystem fs = util.getTestFileSystem();
+    if (fs.exists(stagingBasePath)) {
+      FileStatus[] files = fs.listStatus(stagingBasePath);
+      for (FileStatus file : files) {
+        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
+          file.getPath().getName() != "DONOTERASE");
+      }
+    }
+
+    util.deleteTable(tableName);
+  }
+
+  /**
+   * Test that tags survive through a bulk load that needs to split hfiles. This test depends on the
+   * "hbase.client.rpc.codec" = KeyValueCodecWithTags so that the client can get tags in the
+   * responses.
+   */
+  @Test(timeout = 60000)
+  public void testTagsSurviveBulkLoadSplit() throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+    // table has these split points
+    byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
+        Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
+
+    // creating an hfile that has values that span the split points.
+    byte[] from = Bytes.toBytes("ddd");
+    byte[] to = Bytes.toBytes("ooo");
+    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
+      new Path(familyDir, tn.getMethodName() + "_hfile"), FAMILY, QUALIFIER, from, to, 1000);
+    int expectedRows = 1000;
+
+    TableName tableName = TableName.valueOf(tn.getMethodName());
+    TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
+    util.getAdmin().createTable(htd, tableSplitKeys);
+
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+    String[] args = { dir.toString(), tableName.toString() };
+    loader.run(args);
+
+    Table table = util.getConnection().getTable(tableName);
+    try {
+      assertEquals(expectedRows, util.countRows(table));
+      HFileTestUtil.verifyTags(table);
+    } finally {
+      table.close();
+    }
+
+    util.deleteTable(tableName);
+  }
+
+  /**
+   * Test loading into a column family that does not exist.
+   */
+  @Test(timeout = 60000)
+  public void testNonexistentColumnFamilyLoad() throws Exception {
+    String testName = tn.getMethodName();
+    byte[][][] hFileRanges =
+        new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
+            new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
+
+    byte[] TABLE = Bytes.toBytes("mytable_" + testName);
+    // set real family name to upper case in purpose to simulate the case that
+    // family name in HFiles is invalid
+    TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
+        .addColumnFamily(ColumnFamilyDescriptorBuilder
+            .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
+        .build();
+
+    try {
+      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
+      assertTrue("Loading into table with non-existent family should have failed", false);
+    } catch (Exception e) {
+      assertTrue("IOException expected", e instanceof IOException);
+      // further check whether the exception message is correct
+      String errMsg = e.getMessage();
+      assertTrue(
+        "Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY +
+            "], current message: [" + errMsg + "]",
+        errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
+    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
+  }
+
+  @Test(timeout = 120000)
+  public void testNonHfileFolder() throws Exception {
+    testNonHfileFolder("testNonHfileFolder", false);
+  }
+
+  /**
+   * Write a random data file and a non-file in a dir with a valid family name but not part of the
+   * table families. we should we able to bulkload without getting the unmatched family exception.
+   * HBASE-13037/HBASE-13227
+   */
+  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(tableName);
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"), FAMILY,
+      QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
+    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
+
+    final String NON_FAMILY_FOLDER = "_logs";
+    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
+    fs.mkdirs(nonFamilyDir);
+    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
+    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
+
+    Table table = null;
+    try {
+      if (preCreateTable) {
+        table = util.createTable(TableName.valueOf(tableName), FAMILY);
+      } else {
+        table = util.getConnection().getTable(TableName.valueOf(tableName));
+      }
+
+      final String[] args = { dir.toString(), tableName };
+      new LoadIncrementalHFiles(util.getConfiguration()).run(args);
+      assertEquals(500, util.countRows(table));
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+      fs.delete(dir, true);
+    }
+  }
+
+  private static void createRandomDataFile(FileSystem fs, Path path, int size) throws IOException {
+    FSDataOutputStream stream = fs.create(path);
+    try {
+      byte[] data = new byte[1024];
+      for (int i = 0; i < data.length; ++i) {
+        data[i] = (byte) (i & 0xff);
+      }
+      while (size >= data.length) {
+        stream.write(data, 0, data.length);
+        size -= data.length;
+      }
+      if (size > 0) {
+        stream.write(data, 0, size);
+      }
+    } finally {
+      stream.close();
+    }
+  }
+
+  @Test(timeout = 120000)
+  public void testSplitStoreFile() throws IOException {
+    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
+    FileSystem fs = util.getTestFileSystem();
+    Path testIn = new Path(dir, "testhfile");
+    ColumnFamilyDescriptor familyDesc = ColumnFamilyDescriptorBuilder.of(FAMILY);
+    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
+      Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
+
+    Path bottomOut = new Path(dir, "bottom.out");
+    Path topOut = new Path(dir, "top.out");
+
+    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
+      Bytes.toBytes("ggg"), bottomOut, topOut);
+
+    int rowCount = verifyHFile(bottomOut);
+    rowCount += verifyHFile(topOut);
+    assertEquals(1000, rowCount);
+  }
+
+  @Test
+  public void testSplitStoreFileWithNoneToNone() throws IOException {
+    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
+  }
+
+  @Test
+  public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
+    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
+  }
+
+  @Test
+  public void testSplitStoreFileWithEncodedToNone() throws IOException {
+    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
+  }
+
+  @Test
+  public void testSplitStoreFileWithNoneToEncoded() throws IOException {
+    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
+  }
+
+  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
+      DataBlockEncoding cfEncoding) throws IOException {
+    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
+    FileSystem fs = util.getTestFileSystem();
+    Path testIn = new Path(dir, "testhfile");
+    ColumnFamilyDescriptor familyDesc =
+        ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
+    HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
+      bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
+
+    Path bottomOut = new Path(dir, "bottom.out");
+    Path topOut = new Path(dir, "top.out");
+
+    LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
+      Bytes.toBytes("ggg"), bottomOut, topOut);
+
+    int rowCount = verifyHFile(bottomOut);
+    rowCount += verifyHFile(topOut);
+    assertEquals(1000, rowCount);
+  }
+
+  private int verifyHFile(Path p) throws IOException {
+    Configuration conf = util.getConfiguration();
+    HFile.Reader reader =
+        HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
+    reader.loadFileInfo();
+    HFileScanner scanner = reader.getScanner(false, false);
+    scanner.seekTo();
+    int count = 0;
+    do {
+      count++;
+    } while (scanner.next());
+    assertTrue(count > 0);
+    reader.close();
+    return count;
+  }
+
+  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
+    Integer value = map.containsKey(first) ? map.get(first) : 0;
+    map.put(first, value + 1);
+
+    value = map.containsKey(last) ? map.get(last) : 0;
+    map.put(last, value - 1);
+  }
+
+  @Test(timeout = 120000)
+  public void testInferBoundaries() {
+    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+
+    /*
+     * Toy example c---------i o------p s---------t v------x a------e g-----k m-------------q r----s
+     * u----w Should be inferred as: a-----------------k m-------------q r--------------t
+     * u---------x The output should be (m,r,u)
+     */
+
+    String first;
+    String last;
+
+    first = "a";
+    last = "e";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "r";
+    last = "s";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "o";
+    last = "p";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "g";
+    last = "k";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "v";
+    last = "x";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "c";
+    last = "i";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "m";
+    last = "q";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "s";
+    last = "t";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    first = "u";
+    last = "w";
+    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
+
+    byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
+    byte[][] compare = new byte[3][];
+    compare[0] = "m".getBytes();
+    compare[1] = "r".getBytes();
+    compare[2] = "u".getBytes();
+
+    assertEquals(keysArray.length, 3);
+
+    for (int row = 0; row < keysArray.length; row++) {
+      assertArrayEquals(keysArray[row], compare[row]);
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testLoadTooMayHFiles() throws Exception {
+    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+
+    byte[] from = Bytes.toBytes("begin");
+    byte[] to = Bytes.toBytes("end");
+    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
+      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_" + i),
+        FAMILY, QUALIFIER, from, to, 1000);
+    }
+
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+    String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" };
+    try {
+      loader.run(args);
+      fail("Bulk loading too many files should fail");
+    } catch (IOException ie) {
+      assertTrue(ie.getMessage()
+          .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
+    }
+  }
+
+  @Test(expected = TableNotFoundException.class)
+  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
+    Configuration conf = util.getConfiguration();
+    conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    String[] args = { "directory", "nonExistingTable" };
+    loader.run(args);
+  }
+
+  @Test(timeout = 120000)
+  public void testTableWithCFNameStartWithUnderScore() throws Exception {
+    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
+    FileSystem fs = util.getTestFileSystem();
+    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
+    String family = "_cf";
+    Path familyDir = new Path(dir, family);
+
+    byte[] from = Bytes.toBytes("begin");
+    byte[] to = Bytes.toBytes("end");
+    Configuration conf = util.getConfiguration();
+    String tableName = tn.getMethodName();
+    Table table = util.createTable(TableName.valueOf(tableName), family);
+    HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
+      QUALIFIER, from, to, 1000);
+
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    String[] args = { dir.toString(), tableName };
+    try {
+      loader.run(args);
+      assertEquals(1000, util.countRows(table));
+    } finally {
+      if (null != table) {
+        table.close();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
new file mode 100644
index 0000000..414a6cb
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
@@ -0,0 +1,628 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.mockito.Mockito;
+
+/**
+ * Test cases for the atomic load error handling of the bulk load functionality.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestLoadIncrementalHFilesSplitRecovery {
+  private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
+
+  static HBaseTestingUtility util;
+  // used by secure subclass
+  static boolean useSecure = false;
+
+  final static int NUM_CFS = 10;
+  final static byte[] QUAL = Bytes.toBytes("qual");
+  final static int ROWCOUNT = 100;
+
+  private final static byte[][] families = new byte[NUM_CFS][];
+
+  @Rule
+  public TestName name = new TestName();
+
+  static {
+    for (int i = 0; i < NUM_CFS; i++) {
+      families[i] = Bytes.toBytes(family(i));
+    }
+  }
+
+  static byte[] rowkey(int i) {
+    return Bytes.toBytes(String.format("row_%08d", i));
+  }
+
+  static String family(int i) {
+    return String.format("family_%04d", i);
+  }
+
+  static byte[] value(int i) {
+    return Bytes.toBytes(String.format("%010d", i));
+  }
+
+  public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
+    byte[] val = value(value);
+    for (int i = 0; i < NUM_CFS; i++) {
+      Path testIn = new Path(dir, family(i));
+
+      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+        Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+    }
+  }
+
+  private TableDescriptor createTableDesc(TableName name, int cfs) {
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
+    IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
+        .forEachOrdered(builder::addColumnFamily);
+    return builder.build();
+  }
+
+  /**
+   * Creates a table with given table name and specified number of column families if the table does
+   * not already exist.
+   */
+  private void setupTable(final Connection connection, TableName table, int cfs)
+      throws IOException {
+    try {
+      LOG.info("Creating table " + table);
+      try (Admin admin = connection.getAdmin()) {
+        admin.createTable(createTableDesc(table, cfs));
+      }
+    } catch (TableExistsException tee) {
+      LOG.info("Table " + table + " already exists");
+    }
+  }
+
+  /**
+   * Creates a table with given table name,specified number of column families<br>
+   * and splitkeys if the table does not already exist.
+   * @param table
+   * @param cfs
+   * @param SPLIT_KEYS
+   */
+  private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
+      throws IOException {
+    try {
+      LOG.info("Creating table " + table);
+      util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
+    } catch (TableExistsException tee) {
+      LOG.info("Table " + table + " already exists");
+    }
+  }
+
+  private Path buildBulkFiles(TableName table, int value) throws Exception {
+    Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
+    Path bulk1 = new Path(dir, table.getNameAsString() + value);
+    FileSystem fs = util.getTestFileSystem();
+    buildHFiles(fs, bulk1, value);
+    return bulk1;
+  }
+
+  /**
+   * Populate table with known values.
+   */
+  private void populateTable(final Connection connection, TableName table, int value)
+      throws Exception {
+    // create HFiles for different column families
+    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+    Path bulk1 = buildBulkFiles(table, value);
+    try (Table t = connection.getTable(table);
+        RegionLocator locator = connection.getRegionLocator(table);
+        Admin admin = connection.getAdmin()) {
+      lih.doBulkLoad(bulk1, admin, t, locator);
+    }
+  }
+
+  /**
+   * Split the known table in half. (this is hard coded for this test suite)
+   */
+  private void forceSplit(TableName table) {
+    try {
+      // need to call regions server to by synchronous but isn't visible.
+      HRegionServer hrs = util.getRSForFirstRegionInTable(table);
+
+      for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+        if (hri.getTable().equals(table)) {
+          util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
+          // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
+        }
+      }
+
+      // verify that split completed.
+      int regions;
+      do {
+        regions = 0;
+        for (HRegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+          if (hri.getTable().equals(table)) {
+            regions++;
+          }
+        }
+        if (regions != 2) {
+          LOG.info("Taking some time to complete split...");
+          Thread.sleep(250);
+        }
+      } while (regions != 2);
+    } catch (IOException e) {
+      e.printStackTrace();
+    } catch (InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    util = new HBaseTestingUtility();
+    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+    util.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void teardownCluster() throws Exception {
+    util.shutdownMiniCluster();
+  }
+
+  /**
+   * Checks that all columns have the expected value and that there is the expected number of rows.
+   * @throws IOException
+   */
+  void assertExpectedTable(TableName table, int count, int value) throws IOException {
+    List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString());
+    assertEquals(htds.size(), 1);
+    try (Table t = util.getConnection().getTable(table);
+        ResultScanner sr = t.getScanner(new Scan())) {
+      int i = 0;
+      for (Result r; (r = sr.next()) != null;) {
+        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
+            .forEach(v -> assertArrayEquals(value(value), v));
+        i++;
+      }
+      assertEquals(count, i);
+    } catch (IOException e) {
+      fail("Failed due to exception");
+    }
+  }
+
+  /**
+   * Test that shows that exception thrown from the RS side will result in an exception on the
+   * LIHFile client.
+   */
+  @Test(expected = IOException.class, timeout = 120000)
+  public void testBulkLoadPhaseFailure() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    final AtomicInteger attmptedCalls = new AtomicInteger();
+    final AtomicInteger failedCalls = new AtomicInteger();
+    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTable(connection, table, 10);
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+        @Override
+        protected List<LoadQueueItem> tryAtomicRegionLoad(
+            ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
+            Collection<LoadQueueItem> lqis) throws IOException {
+          int i = attmptedCalls.incrementAndGet();
+          if (i == 1) {
+            Connection errConn;
+            try {
+              errConn = getMockedConnection(util.getConfiguration());
+              serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
+            } catch (Exception e) {
+              LOG.fatal("mocking cruft, should never happen", e);
+              throw new RuntimeException("mocking cruft, should never happen");
+            }
+            failedCalls.incrementAndGet();
+            return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
+          }
+
+          return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
+        }
+      };
+      try {
+        // create HFiles for different column families
+        Path dir = buildBulkFiles(table, 1);
+        try (Table t = connection.getTable(table);
+            RegionLocator locator = connection.getRegionLocator(table);
+            Admin admin = connection.getAdmin()) {
+          lih.doBulkLoad(dir, admin, t, locator);
+        }
+      } finally {
+        util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+          HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+      }
+      fail("doBulkLoad should have thrown an exception");
+    }
+  }
+
+  /**
+   * Test that shows that exception thrown from the RS side will result in the expected number of
+   * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
+   * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
+   */
+  @Test
+  public void testRetryOnIOException() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    final AtomicInteger calls = new AtomicInteger(1);
+    final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
+    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
+    final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+      @Override
+      protected List<LoadQueueItem> tryAtomicRegionLoad(
+          ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first,
+          Collection<LoadQueueItem> lqis) throws IOException {
+        if (calls.getAndIncrement() < util.getConfiguration().getInt(
+          HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) -
+            1) {
+          ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn,
+              tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(),
+              HConstants.PRIORITY_UNSET) {
+            @Override
+            public byte[] rpcCall() throws Exception {
+              throw new IOException("Error calling something on RegionServer");
+            }
+          };
+          return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
+        } else {
+          return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
+        }
+      }
+    };
+    setupTable(conn, table, 10);
+    Path dir = buildBulkFiles(table, 1);
+    lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
+    util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
+
+  }
+
+  private ClusterConnection getMockedConnection(final Configuration conf)
+      throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
+    ClusterConnection c = Mockito.mock(ClusterConnection.class);
+    Mockito.when(c.getConfiguration()).thenReturn(conf);
+    Mockito.doNothing().when(c).close();
+    // Make it so we return a particular location when asked.
+    final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
+        ServerName.valueOf("example.org", 1234, 0));
+    Mockito.when(
+      c.getRegionLocation((TableName) Mockito.any(), (byte[]) Mockito.any(), Mockito.anyBoolean()))
+        .thenReturn(loc);
+    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).thenReturn(loc);
+    ClientProtos.ClientService.BlockingInterface hri =
+        Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
+    Mockito
+        .when(
+          hri.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
+        .thenThrow(new ServiceException(new IOException("injecting bulk load error")));
+    Mockito.when(c.getClient(Mockito.any(ServerName.class))).thenReturn(hri);
+    return c;
+  }
+
+  /**
+   * This test exercises the path where there is a split after initial validation but before the
+   * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
+   * split just before the atomic region load.
+   */
+  @Test(timeout = 120000)
+  public void testSplitWhileBulkLoadPhase() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTable(connection, table, 10);
+      populateTable(connection, table, 1);
+      assertExpectedTable(table, ROWCOUNT, 1);
+
+      // Now let's cause trouble. This will occur after checks and cause bulk
+      // files to fail when attempt to atomically import. This is recoverable.
+      final AtomicInteger attemptedCalls = new AtomicInteger();
+      LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
+        @Override
+        protected void bulkLoadPhase(final Table htable, final Connection conn,
+            ExecutorService pool, Deque<LoadQueueItem> queue,
+            final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
+            Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+          int i = attemptedCalls.incrementAndGet();
+          if (i == 1) {
+            // On first attempt force a split.
+            forceSplit(table);
+          }
+          super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
+        }
+      };
+
+      // create HFiles for different column families
+      try (Table t = connection.getTable(table);
+          RegionLocator locator = connection.getRegionLocator(table);
+          Admin admin = connection.getAdmin()) {
+        Path bulk = buildBulkFiles(table, 2);
+        lih2.doBulkLoad(bulk, admin, t, locator);
+      }
+
+      // check that data was loaded
+      // The three expected attempts are 1) failure because need to split, 2)
+      // load of split top 3) load of split bottom
+      assertEquals(attemptedCalls.get(), 3);
+      assertExpectedTable(table, ROWCOUNT, 2);
+    }
+  }
+
+  /**
+   * This test splits a table and attempts to bulk load. The bulk import files should be split
+   * before atomically importing.
+   */
+  @Test(timeout = 120000)
+  public void testGroupOrSplitPresplit() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTable(connection, table, 10);
+      populateTable(connection, table, 1);
+      assertExpectedTable(connection, table, ROWCOUNT, 1);
+      forceSplit(table);
+
+      final AtomicInteger countedLqis = new AtomicInteger();
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+        @Override
+        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+            Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+            final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+          Pair<List<LoadQueueItem>, String> lqis =
+              super.groupOrSplit(regionGroups, item, htable, startEndKeys);
+          if (lqis != null && lqis.getFirst() != null) {
+            countedLqis.addAndGet(lqis.getFirst().size());
+          }
+          return lqis;
+        }
+      };
+
+      // create HFiles for different column families
+      Path bulk = buildBulkFiles(table, 2);
+      try (Table t = connection.getTable(table);
+          RegionLocator locator = connection.getRegionLocator(table);
+          Admin admin = connection.getAdmin()) {
+        lih.doBulkLoad(bulk, admin, t, locator);
+      }
+      assertExpectedTable(connection, table, ROWCOUNT, 2);
+      assertEquals(20, countedLqis.get());
+    }
+  }
+
+  /**
+   * This test creates a table with many small regions. The bulk load files would be splitted
+   * multiple times before all of them can be loaded successfully.
+   */
+  @Test(timeout = 120000)
+  public void testSplitTmpFileCleanUp() throws Exception {
+    final TableName table = TableName.valueOf(name.getMethodName());
+    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
+        Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
+        Bytes.toBytes("row_00000050") };
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
+
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
+
+      // create HFiles
+      Path bulk = buildBulkFiles(table, 2);
+      try (Table t = connection.getTable(table);
+          RegionLocator locator = connection.getRegionLocator(table);
+          Admin admin = connection.getAdmin()) {
+        lih.doBulkLoad(bulk, admin, t, locator);
+      }
+      // family path
+      Path tmpPath = new Path(bulk, family(0));
+      // TMP_DIR under family path
+      tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
+      FileSystem fs = bulk.getFileSystem(util.getConfiguration());
+      // HFiles have been splitted, there is TMP_DIR
+      assertTrue(fs.exists(tmpPath));
+      // TMP_DIR should have been cleaned-up
+      assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
+        FSUtils.listStatus(fs, tmpPath));
+      assertExpectedTable(connection, table, ROWCOUNT, 2);
+    }
+  }
+
+  /**
+   * This simulates an remote exception which should cause LIHF to exit with an exception.
+   */
+  @Test(expected = IOException.class, timeout = 120000)
+  public void testGroupOrSplitFailure() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
+      setupTable(connection, tableName, 10);
+
+      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
+        int i = 0;
+
+        @Override
+        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+            Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+            final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+          i++;
+
+          if (i == 5) {
+            throw new IOException("failure");
+          }
+          return super.groupOrSplit(regionGroups, item, table, startEndKeys);
+        }
+      };
+
+      // create HFiles for different column families
+      Path dir = buildBulkFiles(tableName, 1);
+      try (Table t = connection.getTable(tableName);
+          RegionLocator locator = connection.getRegionLocator(tableName);
+          Admin admin = connection.getAdmin()) {
+        lih.doBulkLoad(dir, admin, t, locator);
+      }
+    }
+
+    fail("doBulkLoad should have thrown an exception");
+  }
+
+  @Test(timeout = 120000)
+  public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
+    // Share connection. We were failing to find the table with our new reverse scan because it
+    // looks for first region, not any region -- that is how it works now. The below removes first
+    // region in test. Was reliant on the Connection caching having first region.
+    Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
+    Table table = connection.getTable(tableName);
+
+    setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
+    Path dir = buildBulkFiles(tableName, 2);
+
+    final AtomicInteger countedLqis = new AtomicInteger();
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
+
+      @Override
+      protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+          Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
+          final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+        Pair<List<LoadQueueItem>, String> lqis =
+            super.groupOrSplit(regionGroups, item, htable, startEndKeys);
+        if (lqis != null && lqis.getFirst() != null) {
+          countedLqis.addAndGet(lqis.getFirst().size());
+        }
+        return lqis;
+      }
+    };
+
+    // do bulkload when there is no region hole in hbase:meta.
+    try (Table t = connection.getTable(tableName);
+        RegionLocator locator = connection.getRegionLocator(tableName);
+        Admin admin = connection.getAdmin()) {
+      loader.doBulkLoad(dir, admin, t, locator);
+    } catch (Exception e) {
+      LOG.error("exeception=", e);
+    }
+    // check if all the data are loaded into the table.
+    this.assertExpectedTable(tableName, ROWCOUNT, 2);
+
+    dir = buildBulkFiles(tableName, 3);
+
+    // Mess it up by leaving a hole in the hbase:meta
+    List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
+    for (HRegionInfo regionInfo : regionInfos) {
+      if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
+        MetaTableAccessor.deleteRegion(connection, regionInfo);
+        break;
+      }
+    }
+
+    try (Table t = connection.getTable(tableName);
+        RegionLocator locator = connection.getRegionLocator(tableName);
+        Admin admin = connection.getAdmin()) {
+      loader.doBulkLoad(dir, admin, t, locator);
+    } catch (Exception e) {
+      LOG.error("exception=", e);
+      assertTrue("IOException expected", e instanceof IOException);
+    }
+
+    table.close();
+
+    // Make sure at least the one region that still exists can be found.
+    regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
+    assertTrue(regionInfos.size() >= 1);
+
+    this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
+    connection.close();
+  }
+
+  /**
+   * Checks that all columns have the expected value and that there is the expected number of rows.
+   * @throws IOException
+   */
+  void assertExpectedTable(final Connection connection, TableName table, int count, int value)
+      throws IOException {
+    List<TableDescriptor> htds = util.getAdmin().listTableDescriptors(table.getNameAsString());
+    assertEquals(htds.size(), 1);
+    try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
+      int i = 0;
+      for (Result r; (r = sr.next()) != null;) {
+        r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
+            .forEach(v -> assertArrayEquals(value(value), v));
+        i++;
+      }
+      assertEquals(count, i);
+    } catch (IOException e) {
+      fail("Failed due to exception");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
new file mode 100644
index 0000000..3d4f4c6
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
@@ -0,0 +1,66 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode. This suite is unable
+ * to verify the security handoff/turnover as miniCluster is running as system user thus has root
+ * privileges and delegation tokens don't seem to work on miniDFS.
+ * <p>
+ * Thus SecureBulkload can only be completely verified by running integration tests against a secure
+ * cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
+ * supported as part of a LoadIncrementalFiles call.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles {
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // set the always on security provider
+    UserProvider.setUserProviderForTesting(util.getConfiguration(),
+      HadoopSecurityEnabledUserProviderForTesting.class);
+    // setup configuration
+    SecureTestUtil.enableSecurity(util.getConfiguration());
+    util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+      MAX_FILES_PER_REGION_PER_FAMILY);
+    // change default behavior so that tag values are returned with normal rpcs
+    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+      KeyValueCodecWithTags.class.getCanonicalName());
+
+    util.startMiniCluster();
+
+    // Wait for the ACL table to become available
+    util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+
+    setupNamespace();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
new file mode 100644
index 0000000..58fea9d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.security.access.SecureTestUtil;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Reruns TestSecureLoadIncrementalHFilesSplitRecovery using LoadIncrementalHFiles in secure mode.
+ * This suite is unable to verify the security handoff/turnove as miniCluster is running as system
+ * user thus has root privileges and delegation tokens don't seem to work on miniDFS.
+ * <p>
+ * Thus SecureBulkload can only be completely verified by running integration tests against a secure
+ * cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
+ * supported as part of a LoadIncrementalFiles call.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestSecureLoadIncrementalHFilesSplitRecovery
+    extends TestLoadIncrementalHFilesSplitRecovery {
+
+  // This "overrides" the parent static method
+  // make sure they are in sync
+  @BeforeClass
+  public static void setupCluster() throws Exception {
+    util = new HBaseTestingUtility();
+    // set the always on security provider
+    UserProvider.setUserProviderForTesting(util.getConfiguration(),
+      HadoopSecurityEnabledUserProviderForTesting.class);
+    // setup configuration
+    SecureTestUtil.enableSecurity(util.getConfiguration());
+
+    util.startMiniCluster();
+
+    // Wait for the ACL table to become available
+    util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
+  }
+
+  // Disabling this test as it does not work in secure mode
+  @Test(timeout = 180000)
+  @Override
+  public void testBulkLoadPhaseFailure() {
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
index f45c0b9..33fbb68 100644
--- a/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
+++ b/hbase-spark-it/src/test/java/org/apache/hadoop/hbase/spark/IntegrationTestSparkBulkLoad.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 
 import org.apache.hadoop.hbase.mapreduce.IntegrationTestBulkLoad;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
index 040546d..2adba32 100644
--- a/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
+++ b/hbase-spark/src/main/java/org/apache/hadoop/hbase/spark/example/hbasecontext/JavaHBaseBulkLoadExample.java
@@ -41,7 +41,7 @@ import java.util.List;
  *  path/to/hbase-spark.jar {path/to/output/HFiles}
  *
  * This example will output put hfiles in {path/to/output/HFiles}, and user can run
- * 'hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
+ * 'hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles' to load the HFiles into table to verify this example.
  */
 final public class JavaHBaseBulkLoadExample {
   private JavaHBaseBulkLoadExample() {}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
index bfacbe8..e383b5e 100644
--- a/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
+++ b/hbase-spark/src/test/java/org/apache/hadoop/hbase/spark/TestJavaHBaseContext.java
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.spark.example.hbasecontext.JavaHBaseBulkDeleteExample;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
index d2b707e..a427327 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/BulkLoadSuite.scala
@@ -20,7 +20,7 @@ package org.apache.hadoop.hbase.spark
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hbase.client.{Get, ConnectionFactory}
 import org.apache.hadoop.hbase.io.hfile.{CacheConfig, HFile}
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles
 import org.apache.hadoop.hbase.{HConstants, CellUtil, HBaseTestingUtility, TableName}
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/src/main/asciidoc/_chapters/ops_mgt.adoc
----------------------------------------------------------------------
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index f96cd6c..6f7f9e0 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -577,7 +577,7 @@ There are two ways to invoke this utility, with explicit classname and via the d
 
 .Explicit Classname
 ----
-$ bin/hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
+$ bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles <hdfs://storefileoutput> <tablename>
 ----
 
 .Driver


[3/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to another package and mark the old one as deprecated

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 7b4a353..285530d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -18,1288 +18,60 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
-import static java.lang.String.format;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.HashMultimap;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimaps;
-import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import java.io.FileNotFoundException;
 import java.io.IOException;
-import java.io.InterruptedIOException;
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Deque;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.io.HFileLink;
-import org.apache.hadoop.hbase.io.HalfStoreFileReader;
-import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.FsDelegationToken;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSHDFSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
 
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
+ * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Use
+ *             {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles} instead.
  */
+@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NM_SAME_SIMPLE_NAME_AS_SUPERCLASS",
+    justification = "Temporary glue. To be removed")
+@Deprecated
 @InterfaceAudience.Public
-public class LoadIncrementalHFiles extends Configured implements Tool {
-  private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
-  private boolean initalized = false;
-
-  public static final String NAME = "completebulkload";
-  static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
-  public static final String MAX_FILES_PER_REGION_PER_FAMILY
-    = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
-  private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
-  public final static String CREATE_TABLE_CONF_KEY = "create.table";
-  public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
-  public final static String ALWAYS_COPY_FILES = "always.copy.files";
-
-  // We use a '.' prefix which is ignored when walking directory trees
-  // above. It is invalid family name.
-  final static String TMP_DIR = ".tmp";
-
-  private int maxFilesPerRegionPerFamily;
-  private boolean assignSeqIds;
-  private Set<String> unmatchedFamilies = new HashSet<>();
-
-  // Source filesystem
-  private FileSystem fs;
-  // Source delegation token
-  private FsDelegationToken fsDelegationToken;
-  private String bulkToken;
-  private UserProvider userProvider;
-  private int nrThreads;
-  private RpcControllerFactory rpcControllerFactory;
-  private AtomicInteger numRetries;
-
-  private Map<LoadQueueItem, ByteBuffer> retValue = null;
-
-  public LoadIncrementalHFiles(Configuration conf) throws Exception {
-    super(conf);
-    this.rpcControllerFactory = new RpcControllerFactory(conf);
-    initialize();
-  }
-
-  private void initialize() throws IOException {
-    if (initalized) {
-      return;
-    }
-    // make a copy, just to be sure we're not overriding someone else's config
-    setConf(HBaseConfiguration.create(getConf()));
-    Configuration conf = getConf();
-    // disable blockcache for tool invocation, see HBASE-10500
-    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
-    this.userProvider = UserProvider.instantiate(conf);
-    this.fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
-    assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
-    maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
-    nrThreads = conf.getInt("hbase.loadincremental.threads.max",
-      Runtime.getRuntime().availableProcessors());
-    initalized = true;
-    numRetries = new AtomicInteger(1);
-  }
-
-  private void usage() {
-    System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
-        + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
-        + "  Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
-        + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
-        + "\n");
-  }
-
-  private interface BulkHFileVisitor<TFamily> {
-    TFamily bulkFamily(final byte[] familyName)
-      throws IOException;
-    void bulkHFile(final TFamily family, final FileStatus hfileStatus)
-      throws IOException;
-  }
+public class LoadIncrementalHFiles extends org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {
 
   /**
-   * Iterate over the bulkDir hfiles.
-   * Skip reference, HFileLink, files starting with "_" and non-valid hfiles.
+   * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Use
+   *             {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem} instead.
    */
-  private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
-    final BulkHFileVisitor<TFamily> visitor) throws IOException {
-    visitBulkHFiles(fs, bulkDir, visitor, true);
-  }
-
-  /**
-   * Iterate over the bulkDir hfiles.
-   * Skip reference, HFileLink, files starting with "_".
-   * Check and skip non-valid hfiles by default, or skip this validation by setting
-   * 'hbase.loadincremental.validate.hfile' to false.
-   */
-  private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
-    final BulkHFileVisitor<TFamily> visitor, final boolean validateHFile) throws IOException {
-    if (!fs.exists(bulkDir)) {
-      throw new FileNotFoundException("Bulkload dir " + bulkDir + " not found");
-    }
-
-    FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
-    if (familyDirStatuses == null) {
-      throw new FileNotFoundException("No families found in " + bulkDir);
-    }
-
-    for (FileStatus familyStat : familyDirStatuses) {
-      if (!familyStat.isDirectory()) {
-        LOG.warn("Skipping non-directory " + familyStat.getPath());
-        continue;
-      }
-      Path familyDir = familyStat.getPath();
-      byte[] familyName = familyDir.getName().getBytes();
-      // Skip invalid family
-      try {
-        HColumnDescriptor.isLegalFamilyName(familyName);
-      }
-      catch (IllegalArgumentException e) {
-        LOG.warn("Skipping invalid " + familyStat.getPath());
-        continue;
-      }
-      TFamily family = visitor.bulkFamily(familyName);
-
-      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
-      for (FileStatus hfileStatus : hfileStatuses) {
-        if (!fs.isFile(hfileStatus.getPath())) {
-          LOG.warn("Skipping non-file " + hfileStatus);
-          continue;
-        }
-
-        Path hfile = hfileStatus.getPath();
-        // Skip "_", reference, HFileLink
-        String fileName = hfile.getName();
-        if (fileName.startsWith("_")) {
-          continue;
-        }
-        if (StoreFileInfo.isReference(fileName)) {
-          LOG.warn("Skipping reference " + fileName);
-          continue;
-        }
-        if (HFileLink.isHFileLink(fileName)) {
-          LOG.warn("Skipping HFileLink " + fileName);
-          continue;
-        }
-
-        // Validate HFile Format if needed
-        if (validateHFile) {
-          try {
-            if (!HFile.isHFileFormat(fs, hfile)) {
-              LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
-              continue;
-            }
-          } catch (FileNotFoundException e) {
-            LOG.warn("the file " + hfile + " was removed");
-            continue;
-          }
-	}
-
-        visitor.bulkHFile(family, hfileStatus);
-      }
-    }
-  }
-
-  /**
-   * Represents an HFile waiting to be loaded. An queue is used
-   * in this class in order to support the case where a region has
-   * split during the process of the load. When this happens,
-   * the HFile is split into two physical parts across the new
-   * region boundary, and each part is added back into the queue.
-   * The import process finishes when the queue is empty.
-   */
-  public static class LoadQueueItem {
-    final byte[] family;
-    final Path hfilePath;
+  @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NM_SAME_SIMPLE_NAME_AS_SUPERCLASS",
+      justification = "Temporary glue. To be removed")
+  @Deprecated
+  @InterfaceAudience.Public
+  public static class LoadQueueItem
+      extends org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem {
 
     public LoadQueueItem(byte[] family, Path hfilePath) {
-      this.family = family;
-      this.hfilePath = hfilePath;
-    }
-
-    @Override
-    public String toString() {
-      return "family:"+ Bytes.toString(family) + " path:" + hfilePath.toString();
-    }
-
-    public byte[] getFamily() {
-      return family;
-    }
-
-    public Path getFilePath() {
-      return hfilePath;
-    }
-  }
-
-  /*
-   * Populate the Queue with given HFiles
-   */
-  private void populateLoadQueue(final Deque<LoadQueueItem> ret,
-      Map<byte[], List<Path>> map) throws IOException {
-    for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
-      for (Path p : entry.getValue()) {
-        ret.add(new LoadQueueItem(entry.getKey(), p));
-      }
+      super(family, hfilePath);
     }
   }
 
-  /**
-   * Walk the given directory for all HFiles, and return a Queue
-   * containing all such files.
-   */
-  private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
-      final boolean validateHFile) throws IOException {
-    fs = hfofDir.getFileSystem(getConf());
-    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
-      @Override
-      public byte[] bulkFamily(final byte[] familyName) {
-        return familyName;
-      }
-      @Override
-      public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
-        long length = hfile.getLen();
-        if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
-            HConstants.DEFAULT_MAX_FILE_SIZE)) {
-          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
-              length + " bytes can be problematic as it may lead to oversplitting.");
-        }
-        ret.add(new LoadQueueItem(family, hfile.getPath()));
-      }
-    }, validateHFile);
-  }
-
-  /**
-   * Perform a bulk load of the given directory into the given
-   * pre-existing table.  This method is not threadsafe.
-   *
-   * @param hfofDir the directory that was provided as the output path
-   *   of a job using HFileOutputFormat
-   * @param admin the Admin
-   * @param table the table to load into
-   * @param regionLocator region locator
-   * @throws TableNotFoundException if table does not yet exist
-   */
-  public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
-      RegionLocator regionLocator) throws TableNotFoundException, IOException {
-    doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
-  }
-
-  void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
-      SecureBulkLoadClient secureClient) throws IOException {
-    fsDelegationToken.releaseDelegationToken();
-    if (bulkToken != null && secureClient != null) {
-      secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
-    }
-    if (pool != null) {
-      pool.shutdown();
-    }
-    if (!queue.isEmpty()) {
-      StringBuilder err = new StringBuilder();
-      err.append("-------------------------------------------------\n");
-      err.append("Bulk load aborted with some files not yet loaded:\n");
-      err.append("-------------------------------------------------\n");
-      for (LoadQueueItem q : queue) {
-        err.append("  ").append(q.hfilePath).append('\n');
-      }
-      LOG.error(err);
-    }
-  }
-  /**
-   * Perform a bulk load of the given directory into the given
-   * pre-existing table.  This method is not threadsafe.
-   *
-   * @param map map of family to List of hfiles
-   * @param admin the Admin
-   * @param table the table to load into
-   * @param regionLocator region locator
-   * @param silence true to ignore unmatched column families
-   * @param copyFile always copy hfiles if true
-   * @throws TableNotFoundException if table does not yet exist
-   */
-  public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
-      Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
-          throws TableNotFoundException, IOException {
-    if (!admin.isTableAvailable(regionLocator.getName())) {
-      throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
-    }
-    // LQI queue does not need to be threadsafe -- all operations on this queue
-    // happen in this thread
-    Deque<LoadQueueItem> queue = new LinkedList<>();
-    ExecutorService pool = null;
-    SecureBulkLoadClient secureClient = null;
-    try {
-      prepareHFileQueue(map, table, queue, silence);
-      if (queue.isEmpty()) {
-        LOG.warn("Bulk load operation did not get any files to load");
-        return;
-      }
-      pool = createExecutorService();
-      secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
-      for (Map.Entry<byte[], List<Path>> entry : map.entrySet()) {
-        for (Path p : entry.getValue()) {
-          fs = p.getFileSystem(table.getConfiguration());
-          break;
-        }
-      }
-      retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
-    } finally {
-      cleanup(admin, queue, pool, secureClient);
-    }
-  }
-
-  /**
-   * Perform a bulk load of the given directory into the given
-   * pre-existing table.  This method is not threadsafe.
-   *
-   * @param hfofDir the directory that was provided as the output path
-   *   of a job using HFileOutputFormat
-   * @param admin the Admin
-   * @param table the table to load into
-   * @param regionLocator region locator
-   * @param silence true to ignore unmatched column families
-   * @param copyFile always copy hfiles if true
-   * @throws TableNotFoundException if table does not yet exist
-   */
-  public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
-      RegionLocator regionLocator, boolean silence, boolean copyFile)
-          throws TableNotFoundException, IOException {
-    if (!admin.isTableAvailable(regionLocator.getName())) {
-      throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
-    }
-
-    /*
-     * Checking hfile format is a time-consuming operation, we should have an option to skip
-     * this step when bulkloading millions of HFiles. See HBASE-13985.
-     */
-    boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
-    if (!validateHFile) {
-      LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
-          "are not correct. If you fail to read data from your table after using this " +
-          "option, consider removing the files and bulkload again without this option. " +
-          "See HBASE-13985");
-    }
-    // LQI queue does not need to be threadsafe -- all operations on this queue
-    // happen in this thread
-    Deque<LoadQueueItem> queue = new LinkedList<>();
-    ExecutorService pool = null;
-    SecureBulkLoadClient secureClient = null;
-    try {
-      prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
-
-      if (queue.isEmpty()) {
-        LOG.warn("Bulk load operation did not find any files to load in " +
-            "directory " + hfofDir != null ? hfofDir.toUri() : "" + ".  Does it contain files in " +
-            "subdirectories that correspond to column family names?");
-        return;
-      }
-      pool = createExecutorService();
-      secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
-      retValue = performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
-    } finally {
-      cleanup(admin, queue, pool, secureClient);
-    }
-  }
-
-  Map<LoadQueueItem, ByteBuffer> performBulkLoad(final Admin admin, Table table,
-      RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
-      SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
-    int count = 0;
-
-    if(isSecureBulkLoadEndpointAvailable()) {
-      LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
-      LOG.warn("Secure bulk load has been integrated into HBase core.");
-    }
-
-    //If using secure bulk load, get source delegation token, and
-    //prepare staging directory and token
-    // fs is the source filesystem
-    fsDelegationToken.acquireDelegationToken(fs);
-    bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
-    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
-
-    Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
-    // Assumes that region splits can happen while this occurs.
-    while (!queue.isEmpty()) {
-      // need to reload split keys each iteration.
-      final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
-      if (count != 0) {
-        LOG.info("Split occurred while grouping HFiles, retry attempt " +
-            + count + " with " + queue.size() + " files remaining to group or split");
-      }
-
-      int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
-      maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
-      if (maxRetries != 0 && count >= maxRetries) {
-        throw new IOException("Retry attempted " + count +
-            " times without completing, bailing out");
-      }
-      count++;
-
-      // Using ByteBuffer for byte[] equality semantics
-      pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
-      Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
-
-      if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
-        // Error is logged inside checkHFilesCountPerRegionPerFamily.
-        throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily
-            + " hfiles to one family of one region");
-      }
-
-      bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
-          item2RegionMap);
-
-      // NOTE: The next iteration's split / group could happen in parallel to
-      // atomic bulkloads assuming that there are splits and no merges, and
-      // that we can atomically pull out the groups we want to retry.
-    }
-
-    if (!queue.isEmpty()) {
-      throw new RuntimeException("Bulk load aborted with some files not yet loaded."
-        + "Please check log for more details.");
-    }
-    return item2RegionMap;
-  }
-
-  protected ClientServiceCallable<byte[]> buildClientServiceCallable(final Connection conn,
-      TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
-
-    final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
-    for (LoadQueueItem lqi : lqis) {
-        famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
-    }
-
-    return new ClientServiceCallable<byte[]>(conn,
-        tableName, first, rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
-      @Override
-      protected byte[] rpcCall() throws Exception {
-        SecureBulkLoadClient secureClient = null;
-        boolean success = false;
-        try {
-          LOG.debug("Going to connect to server " + getLocation() + " for row "
-              + Bytes.toStringBinary(getRow()) + " with hfile group " +
-              LoadIncrementalHFiles.this.toString( famPaths));
-          byte[] regionName = getLocation().getRegionInfo().getRegionName();
-          try (Table table = conn.getTable(getTableName())) {
-            secureClient = new SecureBulkLoadClient(getConf(), table);
-            success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-                assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
-          }
-          return success ? regionName : null;
-        } finally {
-          //Best effort copying of files that might not have been imported
-          //from the staging directory back to original location
-          //in user directory
-          if (secureClient != null && !success) {
-            FileSystem targetFs = FileSystem.get(getConf());
-            // fs is the source filesystem
-            if (fs == null) {
-              fs = lqis.iterator().next().hfilePath.getFileSystem(getConf());
-            }
-            // Check to see if the source and target filesystems are the same
-            // If they are the same filesystem, we will try move the files back
-            // because previously we moved them to the staging directory.
-            if (FSHDFSUtils.isSameHdfs(getConf(), fs, targetFs)) {
-              for (Pair<byte[], String> el : famPaths) {
-                Path hfileStagingPath = null;
-                Path hfileOrigPath = new Path(el.getSecond());
-                try {
-                  hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
-                      hfileOrigPath.getName());
-                  if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
-                    LOG.debug("Moved back file " + hfileOrigPath + " from " +
-                        hfileStagingPath);
-                  } else if (targetFs.exists(hfileStagingPath)) {
-                    LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
-                        hfileStagingPath);
-                  }
-                } catch (Exception ex) {
-                  LOG.debug("Unable to move back file " + hfileOrigPath + " from " +
-                      hfileStagingPath, ex);
-                }
-              }
-            }
-          }
-        }
-      }
-    };
-  }
-
-  /**
-   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
-   * passed directory and validates whether the prepared queue has all the valid table column
-   * families in it.
-   * @param hfilesDir directory containing list of hfiles to be loaded into the table
-   * @param table table to which hfiles should be loaded
-   * @param queue queue which needs to be loaded into the table
-   * @param validateHFile if true hfiles will be validated for its format
-   * @throws IOException If any I/O or network error occurred
-   */
-  public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
-      boolean validateHFile) throws IOException {
-    prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
-  }
-
-  /**
-   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
-   * passed directory and validates whether the prepared queue has all the valid table column
-   * families in it.
-   * @param hfilesDir directory containing list of hfiles to be loaded into the table
-   * @param table table to which hfiles should be loaded
-   * @param queue queue which needs to be loaded into the table
-   * @param validateHFile if true hfiles will be validated for its format
-   * @param silence  true to ignore unmatched column families
-   * @throws IOException If any I/O or network error occurred
-   */
-  public void prepareHFileQueue(Path hfilesDir, Table table,
-      Deque<LoadQueueItem> queue, boolean validateHFile, boolean silence) throws IOException {
-    discoverLoadQueue(queue, hfilesDir, validateHFile);
-    validateFamiliesInHFiles(table, queue, silence);
-  }
-
-  /**
-   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
-   * passed directory and validates whether the prepared queue has all the valid table column
-   * families in it.
-   * @param map map of family to List of hfiles
-   * @param table table to which hfiles should be loaded
-   * @param queue queue which needs to be loaded into the table
-   * @param silence  true to ignore unmatched column families
-   * @throws IOException If any I/O or network error occurred
-   */
-  public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
-      Deque<LoadQueueItem> queue, boolean silence) throws IOException {
-    populateLoadQueue(queue, map);
-    validateFamiliesInHFiles(table, queue, silence);
-  }
-
-  // Initialize a thread pool
-  private ExecutorService createExecutorService() {
-    ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
-    builder.setNameFormat("LoadIncrementalHFiles-%1$d");
-    ExecutorService pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
-        new LinkedBlockingQueue<>(), builder.build());
-    ((ThreadPoolExecutor) pool).allowCoreThreadTimeOut(true);
-    return pool;
-  }
-
-  /**
-   * Checks whether there is any invalid family name in HFiles to be bulk loaded.
-   */
-  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
-      throws IOException {
-    ColumnFamilyDescriptor[] families = table.getDescriptor().getColumnFamilies();
-    List<String> familyNames = new ArrayList<>(families.length);
-    for (ColumnFamilyDescriptor family : families) {
-      familyNames.add(family.getNameAsString());
-    }
-    Iterator<LoadQueueItem> queueIter = queue.iterator();
-    while (queueIter.hasNext()) {
-      LoadQueueItem lqi = queueIter.next();
-      String familyNameInHFile = Bytes.toString(lqi.family);
-      if (!familyNames.contains(familyNameInHFile)) {
-        unmatchedFamilies.add(familyNameInHFile);
-      }
-    }
-    if (unmatchedFamilies.size() > 0) {
-      String msg =
-          "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: "
-              + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
-              + familyNames;
-      LOG.error(msg);
-      if (!silence) throw new IOException(msg);
-    }
-  }
-
-  /**
-   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
-   * <ol>
-   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
-   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
-   * </li>
-   * </ol>
-   * @param table Table to which these hfiles should be loaded to
-   * @param conn Connection to use
-   * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
-   * @param startEndKeys starting and ending row keys of the region
-   */
-  public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
-      Pair<byte[][], byte[][]> startEndKeys) throws IOException {
-    loadHFileQueue(table, conn, queue, startEndKeys, false);
-  }
-
-  /**
-   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
-   * <ol>
-   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
-   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
-   * </li>
-   * </ol>
-   * @param table Table to which these hfiles should be loaded to
-   * @param conn Connection to use
-   * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
-   * @param startEndKeys starting and ending row keys of the region
-   */
-  public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
-      Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
-    ExecutorService pool = null;
-    try {
-      pool = createExecutorService();
-      Multimap<ByteBuffer, LoadQueueItem> regionGroups =
-          groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
-      bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
-    } finally {
-      if (pool != null) {
-        pool.shutdown();
-      }
-    }
-  }
-
-  /**
-   * This takes the LQI's grouped by likely regions and attempts to bulk load
-   * them.  Any failures are re-queued for another pass with the
-   * groupOrSplitPhase.
-   */
-  protected void bulkLoadPhase(final Table table, final Connection conn,
-      ExecutorService pool, Deque<LoadQueueItem> queue,
-      final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
-      Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
-    // atomically bulk load the groups.
-    Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
-    for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){
-      final byte[] first = e.getKey().array();
-      final Collection<LoadQueueItem> lqis =  e.getValue();
-
-      final ClientServiceCallable<byte[]> serviceCallable =
-          buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile);
-
-      final Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
-        @Override
-        public List<LoadQueueItem> call() throws Exception {
-          List<LoadQueueItem> toRetry =
-              tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis);
-          return toRetry;
-        }
-      };
-      if (item2RegionMap != null) {
-        for (LoadQueueItem lqi : lqis) {
-          item2RegionMap.put(lqi, e.getKey());
-        }
-      }
-      loadingFutures.add(pool.submit(call));
-    }
-
-    // get all the results.
-    for (Future<List<LoadQueueItem>> future : loadingFutures) {
-      try {
-        List<LoadQueueItem> toRetry = future.get();
-
-        if (item2RegionMap != null) {
-          for (LoadQueueItem lqi : toRetry) {
-            item2RegionMap.remove(lqi);
-          }
-        }
-        // LQIs that are requeued to be regrouped.
-        queue.addAll(toRetry);
-
-      } catch (ExecutionException e1) {
-        Throwable t = e1.getCause();
-        if (t instanceof IOException) {
-          // At this point something unrecoverable has happened.
-          // TODO Implement bulk load recovery
-          throw new IOException("BulkLoad encountered an unrecoverable problem", t);
-        }
-        LOG.error("Unexpected execution exception during bulk load", e1);
-        throw new IllegalStateException(t);
-      } catch (InterruptedException e1) {
-        LOG.error("Unexpected interrupted exception during bulk load", e1);
-        throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
-      }
-    }
-  }
-
-  private boolean checkHFilesCountPerRegionPerFamily(
-      final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
-    for (Entry<ByteBuffer,
-      ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()) {
-      final Collection<LoadQueueItem> lqis =  e.getValue();
-      HashMap<byte[], MutableInt> filesMap = new HashMap<>();
-      for (LoadQueueItem lqi: lqis) {
-        MutableInt count = filesMap.get(lqi.family);
-        if (count == null) {
-          count = new MutableInt();
-          filesMap.put(lqi.family, count);
-        }
-        count.increment();
-        if (count.intValue() > maxFilesPerRegionPerFamily) {
-          LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily
-            + " hfiles to family " + Bytes.toStringBinary(lqi.family)
-            + " of region with start key "
-            + Bytes.toStringBinary(e.getKey()));
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   * @param table the table to load into
-   * @param pool the ExecutorService
-   * @param queue the queue for LoadQueueItem
-   * @param startEndKeys start and end keys
-   * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
-   */
-  private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
-      final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
-      final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
-    // <region start key, LQI> need synchronized only within this scope of this
-    // phase because of the puts that happen in futures.
-    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
-    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
-    Set<String> missingHFiles = new HashSet<>();
-    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = new Pair<>(regionGroups,
-        missingHFiles);
-
-    // drain LQIs and figure out bulk load groups
-    Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
-    while (!queue.isEmpty()) {
-      final LoadQueueItem item = queue.remove();
-
-      final Callable<Pair<List<LoadQueueItem>, String>> call =
-          new Callable<Pair<List<LoadQueueItem>, String>>() {
-        @Override
-        public Pair<List<LoadQueueItem>, String> call() throws Exception {
-          Pair<List<LoadQueueItem>, String> splits = groupOrSplit(regionGroups, item, table,
-              startEndKeys);
-          return splits;
-        }
-      };
-      splittingFutures.add(pool.submit(call));
-    }
-    // get all the results.  All grouping and splitting must finish before
-    // we can attempt the atomic loads.
-    for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
-      try {
-        Pair<List<LoadQueueItem>, String> splits = lqis.get();
-        if (splits != null) {
-          if (splits.getFirst() != null) {
-            queue.addAll(splits.getFirst());
-          } else {
-            missingHFiles.add(splits.getSecond());
-          }
-        }
-      } catch (ExecutionException e1) {
-        Throwable t = e1.getCause();
-        if (t instanceof IOException) {
-          LOG.error("IOException during splitting", e1);
-          throw (IOException)t; // would have been thrown if not parallelized,
-        }
-        LOG.error("Unexpected execution exception during splitting", e1);
-        throw new IllegalStateException(t);
-      } catch (InterruptedException e1) {
-        LOG.error("Unexpected interrupted exception during splitting", e1);
-        throw (InterruptedIOException)new InterruptedIOException().initCause(e1);
-      }
-    }
-    return pair;
-  }
-
-  // unique file name for the table
-  private String getUniqueName() {
-    return UUID.randomUUID().toString().replaceAll("-", "");
-  }
-
-  protected List<LoadQueueItem> splitStoreFile(final LoadQueueItem item,
-      final Table table, byte[] startKey,
-      byte[] splitKey) throws IOException {
-    final Path hfilePath = item.hfilePath;
-
-    Path tmpDir = item.hfilePath.getParent();
-    if (!tmpDir.getName().equals(TMP_DIR)) {
-      tmpDir = new Path(tmpDir, TMP_DIR);
-    }
-
-    LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
-      "region. Splitting...");
-
-    String uniqueName = getUniqueName();
-    HColumnDescriptor familyDesc = table.getTableDescriptor().getFamily(item.family);
-
-    Path botOut = new Path(tmpDir, uniqueName + ".bottom");
-    Path topOut = new Path(tmpDir, uniqueName + ".top");
-    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
-
-    FileSystem fs = tmpDir.getFileSystem(getConf());
-    fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
-    fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
-    fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
-
-    // Add these back at the *front* of the queue, so there's a lower
-    // chance that the region will just split again before we get there.
-    List<LoadQueueItem> lqis = new ArrayList<>(2);
-    lqis.add(new LoadQueueItem(item.family, botOut));
-    lqis.add(new LoadQueueItem(item.family, topOut));
-
-    // If the current item is already the result of previous splits,
-    // we don't need it anymore. Clean up to save space.
-    // It is not part of the original input files.
-    try {
-      tmpDir = item.hfilePath.getParent();
-      if (tmpDir.getName().equals(TMP_DIR)) {
-        fs.delete(item.hfilePath, false);
-      }
-    } catch (IOException e) {
-      LOG.warn("Unable to delete temporary split file " + item.hfilePath);
-    }
-    LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
-    return lqis;
-  }
-
-  /**
-   * Attempt to assign the given load queue item into its target region group.
-   * If the hfile boundary no longer fits into a region, physically splits
-   * the hfile such that the new bottom half will fit and returns the list of
-   * LQI's corresponding to the resultant hfiles.
-   *
-   * protected for testing
-   * @throws IOException if an IO failure is encountered
-   */
-  protected Pair<List<LoadQueueItem>, String> groupOrSplit(
-      Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
-      final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
-    final Path hfilePath = item.hfilePath;
-    // fs is the source filesystem
-    if (fs == null) {
-      fs = hfilePath.getFileSystem(getConf());
-    }
-    HFile.Reader hfr = null;
-    try {
-      hfr = HFile.createReader(fs, hfilePath, new CacheConfig(getConf()), true, getConf());
-    } catch (FileNotFoundException fnfe) {
-      LOG.debug("encountered", fnfe);
-      return new Pair<>(null, hfilePath.getName());
-    }
-    final byte[] first, last;
-    try {
-      hfr.loadFileInfo();
-      first = hfr.getFirstRowKey();
-      last = hfr.getLastRowKey();
-    }  finally {
-      hfr.close();
-    }
-
-    LOG.info("Trying to load hfile=" + hfilePath +
-        " first=" + Bytes.toStringBinary(first) +
-        " last="  + Bytes.toStringBinary(last));
-    if (first == null || last == null) {
-      assert first == null && last == null;
-      // TODO what if this is due to a bad HFile?
-      LOG.info("hfile " + hfilePath + " has no entries, skipping");
-      return null;
-    }
-    if (Bytes.compareTo(first, last) > 0) {
-      throw new IllegalArgumentException(
-      "Invalid range: " + Bytes.toStringBinary(first) +
-      " > " + Bytes.toStringBinary(last));
-    }
-    int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
-        Bytes.BYTES_COMPARATOR);
-    if (idx < 0) {
-      // not on boundary, returns -(insertion index).  Calculate region it
-      // would be in.
-      idx = -(idx + 1) - 1;
-    }
-    final int indexForCallable = idx;
-
-    /**
-     * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
-     * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
-     * region. 3) if the endkey of the last region is not empty.
-     */
-    if (indexForCallable < 0) {
-      throw new IOException("The first region info for table "
-          + table.getName()
-          + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
-    } else if ((indexForCallable == startEndKeys.getFirst().length - 1)
-        && !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
-      throw new IOException("The last region info for table "
-          + table.getName()
-          + " can't be found in hbase:meta.Please use hbck tool to fix it first.");
-    } else if (indexForCallable + 1 < startEndKeys.getFirst().length
-        && !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
-          startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
-      throw new IOException("The endkey of one region for table "
-          + table.getName()
-          + " is not equal to the startkey of the next region in hbase:meta."
-          + "Please use hbck tool to fix it first.");
-    }
-
-    boolean lastKeyInRange =
-      Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
-      Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
-    if (!lastKeyInRange) {
-      List<LoadQueueItem> lqis = splitStoreFile(item, table,
-          startEndKeys.getFirst()[indexForCallable],
-          startEndKeys.getSecond()[indexForCallable]);
-      return new Pair<>(lqis, null);
-    }
-
-    // group regions.
-    regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
-    return null;
-  }
-
-  /**
-   * Attempts to do an atomic load of many hfiles into a region.  If it fails,
-   * it returns a list of hfiles that need to be retried.  If it is successful
-   * it will return an empty list.
-   *
-   * NOTE: To maintain row atomicity guarantees, region server callable should
-   * succeed atomically and fails atomically.
-   *
-   * Protected for testing.
-   *
-   * @return empty list if success, list of items to retry on recoverable
-   *   failure
-   */
-  protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
-      final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
-      throws IOException {
-    try {
-      List<LoadQueueItem> toRetry = new ArrayList<>();
-      Configuration conf = getConf();
-      byte[] region = RpcRetryingCallerFactory.instantiate(conf,
-          null).<byte[]> newCaller()
-          .callWithRetries(serviceCallable, Integer.MAX_VALUE);
-      if (region == null) {
-        LOG.warn("Attempt to bulk load region containing "
-            + Bytes.toStringBinary(first) + " into table "
-            + tableName  + " with files " + lqis
-            + " failed.  This is recoverable and they will be retried.");
-        toRetry.addAll(lqis); // return lqi's to retry
-      }
-      // success
-      return toRetry;
-    } catch (IOException e) {
-      LOG.error("Encountered unrecoverable error from region server, additional details: "
-          + serviceCallable.getExceptionMessageAdditionalDetail(), e);
-      throw e;
-    }
-  }
-
-  private final String toString(List<Pair<byte[], String>> list) {
-    StringBuffer sb = new StringBuffer();
-    sb.append("[");
-    if(list != null){
-      for(Pair<byte[], String> pair: list) {
-        sb.append("{");
-        sb.append(Bytes.toStringBinary(pair.getFirst()));
-        sb.append(",");
-        sb.append(pair.getSecond());
-        sb.append("}");
-      }
-    }
-    sb.append("]");
-    return sb.toString();
-  }
-  private boolean isSecureBulkLoadEndpointAvailable() {
-    String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
-    return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
-  }
-
-  /**
-   * Split a storefile into a top and bottom half, maintaining
-   * the metadata, recreating bloom filters, etc.
-   */
-  static void splitStoreFile(
-      Configuration conf, Path inFile,
-      HColumnDescriptor familyDesc, byte[] splitKey,
-      Path bottomOut, Path topOut) throws IOException {
-    // Open reader with no block cache, and not in-memory
-    Reference topReference = Reference.createTopReference(splitKey);
-    Reference bottomReference = Reference.createBottomReference(splitKey);
-
-    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
-    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
-  }
-
-  /**
-   * Copy half of an HFile into a new HFile.
-   */
-  private static void copyHFileHalf(
-      Configuration conf, Path inFile, Path outFile, Reference reference,
-      HColumnDescriptor familyDescriptor)
-  throws IOException {
-    FileSystem fs = inFile.getFileSystem(conf);
-    CacheConfig cacheConf = new CacheConfig(conf);
-    HalfStoreFileReader halfReader = null;
-    StoreFileWriter halfWriter = null;
-    try {
-      halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
-          new AtomicInteger(0), true, conf);
-      Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
-
-      int blocksize = familyDescriptor.getBlocksize();
-      Algorithm compression = familyDescriptor.getCompressionType();
-      BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
-      HFileContext hFileContext = new HFileContextBuilder()
-                                  .withCompression(compression)
-                                  .withChecksumType(HStore.getChecksumType(conf))
-                                  .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf))
-                                  .withBlockSize(blocksize)
-                                  .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding())
-                                  .withIncludesTags(true)
-                                  .build();
-      halfWriter = new StoreFileWriter.Builder(conf, cacheConf,
-          fs)
-              .withFilePath(outFile)
-              .withBloomType(bloomFilterType)
-              .withFileContext(hFileContext)
-              .build();
-      HFileScanner scanner = halfReader.getScanner(false, false, false);
-      scanner.seekTo();
-      do {
-        halfWriter.append(scanner.getCell());
-      } while (scanner.next());
-
-      for (Map.Entry<byte[],byte[]> entry : fileInfo.entrySet()) {
-        if (shouldCopyHFileMetaKey(entry.getKey())) {
-          halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
-        }
-      }
-    } finally {
-      if (halfWriter != null) {
-        halfWriter.close();
-      }
-      if (halfReader != null) {
-        halfReader.close(cacheConf.shouldEvictOnClose());
-      }
-    }
-  }
-
-  private static boolean shouldCopyHFileMetaKey(byte[] key) {
-    // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
-    if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
-      return false;
-    }
-
-    return !HFile.isReservedFileInfoKey(key);
-  }
-
-  /*
-   * Infers region boundaries for a new table.
-   * Parameter:
-   *   bdryMap is a map between keys to an integer belonging to {+1, -1}
-   *     If a key is a start key of a file, then it maps to +1
-   *     If a key is an end key of a file, then it maps to -1
-   * Algo:
-   * 1) Poll on the keys in order:
-   *    a) Keep adding the mapped values to these keys (runningSum)
-   *    b) Each time runningSum reaches 0, add the start Key from when the runningSum had started to
-   *       a boundary list.
-   * 2) Return the boundary list.
-   */
-  public static byte[][] inferBoundaries(TreeMap<byte[], Integer> bdryMap) {
-    ArrayList<byte[]> keysArray = new ArrayList<>();
-    int runningValue = 0;
-    byte[] currStartKey = null;
-    boolean firstBoundary = true;
-
-    for (Map.Entry<byte[], Integer> item: bdryMap.entrySet()) {
-      if (runningValue == 0) {
-        currStartKey = item.getKey();
-      }
-      runningValue += item.getValue();
-      if (runningValue == 0) {
-        if (!firstBoundary) {
-          keysArray.add(currStartKey);
-        }
-        firstBoundary = false;
-      }
-    }
-
-    return keysArray.toArray(new byte[0][0]);
-  }
-
-  /*
-   * If the table is created for the first time, then "completebulkload" reads the files twice.
-   * More modifications necessary if we want to avoid doing it.
-   */
-  private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
-    final Path hfofDir = new Path(dirPath);
-    final FileSystem fs = hfofDir.getFileSystem(getConf());
-
-    // Add column families
-    // Build a set of keys
-    final HTableDescriptor htd = new HTableDescriptor(tableName);
-    final TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<HColumnDescriptor>() {
-      @Override
-      public HColumnDescriptor bulkFamily(final byte[] familyName) {
-        HColumnDescriptor hcd = new HColumnDescriptor(familyName);
-        htd.addFamily(hcd);
-        return hcd;
-      }
-      @Override
-      public void bulkHFile(final HColumnDescriptor hcd, final FileStatus hfileStatus)
-          throws IOException {
-        Path hfile = hfileStatus.getPath();
-        try (HFile.Reader reader =
-            HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
-          if (hcd.getCompressionType() != reader.getFileContext().getCompression()) {
-            hcd.setCompressionType(reader.getFileContext().getCompression());
-            LOG.info("Setting compression " + hcd.getCompressionType().name() + " for family " +
-                hcd.toString());
-          }
-          reader.loadFileInfo();
-          byte[] first = reader.getFirstRowKey();
-          byte[] last = reader.getLastRowKey();
-
-          LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
-              Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
-
-          // To eventually infer start key-end key boundaries
-          Integer value = map.containsKey(first) ? map.get(first) : 0;
-          map.put(first, value + 1);
-
-          value = map.containsKey(last) ? map.get(last) : 0;
-          map.put(last, value - 1);
-        }
-      }
-    });
-
-    byte[][] keys = LoadIncrementalHFiles.inferBoundaries(map);
-    admin.createTable(htd, keys);
-
-    LOG.info("Table "+ tableName +" is available!!");
+  public LoadIncrementalHFiles(Configuration conf) {
+    super(conf);
   }
 
   public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
       TableName tableName) throws IOException {
-    initialize();
-    try (Connection connection = ConnectionFactory.createConnection(getConf());
-        Admin admin = connection.getAdmin()) {
-
-      boolean tableExists = admin.tableExists(tableName);
-      if (!tableExists) {
-        if (dirPath != null && "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
-          this.createTable(tableName, dirPath, admin);
-        } else {
-          String errorMsg = format("Table '%s' does not exist.", tableName);
-          LOG.error(errorMsg);
-          throw new TableNotFoundException(errorMsg);
-        }
-      }
-      Path hfofDir = null;
-      if (dirPath != null) {
-        hfofDir = new Path(dirPath);
-      }
-
-      try (Table table = connection.getTable(tableName);
-        RegionLocator locator = connection.getRegionLocator(tableName)) {
-        boolean silence = "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
-        boolean copyFiles = getConf().getBoolean(ALWAYS_COPY_FILES, false);
-        if (dirPath != null) {
-          doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
-        } else {
-          doBulkLoad(map, admin, table, locator, silence, copyFiles);
-        }
-        return retValue;
-      }
-    }
-  }
-
-  @Override
-  public int run(String[] args) throws Exception {
-    if (args.length < 2) {
-      usage();
-      return -1;
-    }
-
-    String dirPath = args[0];
-    TableName tableName = TableName.valueOf(args[1]);
-    Map<LoadQueueItem, ByteBuffer> loaded = run(dirPath, null, tableName);
-    if (loaded == null || !loaded.isEmpty()) return 0;
-    return -1;
-  }
-
-  public static void main(String[] args) throws Exception {
-    Configuration conf = HBaseConfiguration.create();
-    int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args);
-    System.exit(ret);
-  }
-
-  /**
-   * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
-   * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
-   * property. This directory is used as a temporary directory where all files are initially
-   * copied/moved from user given directory, set all the required file permissions and then from
-   * their it is finally loaded into a table. This should be set only when, one would like to manage
-   * the staging directory by itself. Otherwise this tool will handle this by itself.
-   * @param stagingDir staging directory path
-   */
-  public void setBulkToken(String stagingDir) {
-    this.bulkToken = stagingDir;
+    Map<org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> originRet;
+    if (dirPath != null) {
+      originRet = run(dirPath, tableName);
+    } else {
+      originRet = run(map, tableName);
+    }
+    Map<LoadQueueItem, ByteBuffer> ret = new HashMap<>();
+    originRet.forEach((k, v) -> {
+      ret.put(new LoadQueueItem(k.getFamily(), k.getFilePath()), v);
+    });
+    return ret;
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 2308ddf..741f200 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.crypto.Encryption;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFile.Reader;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.mob.MobConstants;
 import org.apache.hadoop.hbase.mob.MobFileName;
 import org.apache.hadoop.hbase.mob.MobUtils;
@@ -82,6 +81,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.security.EncryptionUtil;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Pair;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index b3556c6..9cc33d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -45,8 +45,8 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionLocator;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;


[2/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to another package and mark the old one as deprecated

Posted by zh...@apache.org.
http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
new file mode 100644
index 0000000..1f27d04
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -0,0 +1,1251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static java.lang.String.format;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ClientServiceCallable;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.HalfStoreFileReader;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.HashMultimap;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimaps;
+import org.apache.hadoop.hbase.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+/**
+ * Tool to load the output of HFileOutputFormat into an existing table.
+ */
+@InterfaceAudience.Public
+public class LoadIncrementalHFiles extends Configured implements Tool {
+
+  private static final Log LOG = LogFactory.getLog(LoadIncrementalHFiles.class);
+
+  public static final String NAME = "completebulkload";
+  static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
+  public static final String MAX_FILES_PER_REGION_PER_FAMILY =
+      "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
+  private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+  public final static String CREATE_TABLE_CONF_KEY = "create.table";
+  public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
+  public final static String ALWAYS_COPY_FILES = "always.copy.files";
+
+  // We use a '.' prefix which is ignored when walking directory trees
+  // above. It is invalid family name.
+  static final String TMP_DIR = ".tmp";
+
+  private final int maxFilesPerRegionPerFamily;
+  private final boolean assignSeqIds;
+
+  // Source delegation token
+  private final FsDelegationToken fsDelegationToken;
+  private final UserProvider userProvider;
+  private final int nrThreads;
+  private final RpcControllerFactory rpcControllerFactory;
+
+  private String bulkToken;
+
+  /**
+   * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
+   * the case where a region has split during the process of the load. When this happens, the HFile
+   * is split into two physical parts across the new region boundary, and each part is added back
+   * into the queue. The import process finishes when the queue is empty.
+   */
+  @InterfaceAudience.Public
+  public static class LoadQueueItem {
+    private final byte[] family;
+    private final Path hfilePath;
+
+    public LoadQueueItem(byte[] family, Path hfilePath) {
+      this.family = family;
+      this.hfilePath = hfilePath;
+    }
+
+    @Override
+    public String toString() {
+      return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString();
+    }
+
+    public byte[] getFamily() {
+      return family;
+    }
+
+    public Path getFilePath() {
+      return hfilePath;
+    }
+  }
+
+  public LoadIncrementalHFiles(Configuration conf) {
+    // make a copy, just to be sure we're not overriding someone else's config
+    super(HBaseConfiguration.create(conf));
+    conf = getConf();
+    // disable blockcache for tool invocation, see HBASE-10500
+    conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+    userProvider = UserProvider.instantiate(conf);
+    fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+    assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
+    maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
+    nrThreads = conf.getInt("hbase.loadincremental.threads.max",
+      Runtime.getRuntime().availableProcessors());
+    rpcControllerFactory = new RpcControllerFactory(conf);
+  }
+
+  private void usage() {
+    System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D" +
+        CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n" +
+        "  Note: if you set this to 'no', then the target table must already exist in HBase\n -D" +
+        IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used to ignore unmatched column families\n" +
+        "\n");
+  }
+
+  /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+   * passed directory and validates whether the prepared queue has all the valid table column
+   * families in it.
+   * @param hfilesDir directory containing list of hfiles to be loaded into the table
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @param validateHFile if true hfiles will be validated for its format
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
+      boolean validateHFile) throws IOException {
+    prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
+  }
+
+  /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+   * passed directory and validates whether the prepared queue has all the valid table column
+   * families in it.
+   * @param hfilesDir directory containing list of hfiles to be loaded into the table
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @param validateHFile if true hfiles will be validated for its format
+   * @param silence true to ignore unmatched column families
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
+      boolean validateHFile, boolean silence) throws IOException {
+    discoverLoadQueue(queue, hfilesDir, validateHFile);
+    validateFamiliesInHFiles(table, queue, silence);
+  }
+
+  /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+   * passed directory and validates whether the prepared queue has all the valid table column
+   * families in it.
+   * @param map map of family to List of hfiles
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @param silence true to ignore unmatched column families
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
+      Deque<LoadQueueItem> queue, boolean silence) throws IOException {
+    populateLoadQueue(queue, map);
+    validateFamiliesInHFiles(table, queue, silence);
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given pre-existing table. This method is
+   * not threadsafe.
+   * @param hfofDir the directory that was provided as the output path of a job using
+   *          HFileOutputFormat
+   * @param admin the Admin
+   * @param table the table to load into
+   * @param regionLocator region locator
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
+      RegionLocator regionLocator) throws TableNotFoundException, IOException {
+    return doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given pre-existing table. This method is
+   * not threadsafe.
+   * @param map map of family to List of hfiles
+   * @param admin the Admin
+   * @param table the table to load into
+   * @param regionLocator region locator
+   * @param silence true to ignore unmatched column families
+   * @param copyFile always copy hfiles if true
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
+      Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
+      throws TableNotFoundException, IOException {
+    if (!admin.isTableAvailable(regionLocator.getName())) {
+      throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
+    }
+    // LQI queue does not need to be threadsafe -- all operations on this queue
+    // happen in this thread
+    Deque<LoadQueueItem> queue = new ArrayDeque<>();
+    ExecutorService pool = null;
+    SecureBulkLoadClient secureClient = null;
+    try {
+      prepareHFileQueue(map, table, queue, silence);
+      if (queue.isEmpty()) {
+        LOG.warn("Bulk load operation did not get any files to load");
+        return Collections.emptyMap();
+      }
+      pool = createExecutorService();
+      secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+      return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
+    } finally {
+      cleanup(admin, queue, pool, secureClient);
+    }
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given pre-existing table. This method is
+   * not threadsafe.
+   * @param hfofDir the directory that was provided as the output path of a job using
+   *          HFileOutputFormat
+   * @param admin the Admin
+   * @param table the table to load into
+   * @param regionLocator region locator
+   * @param silence true to ignore unmatched column families
+   * @param copyFile always copy hfiles if true
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
+      RegionLocator regionLocator, boolean silence, boolean copyFile)
+      throws TableNotFoundException, IOException {
+    if (!admin.isTableAvailable(regionLocator.getName())) {
+      throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
+    }
+
+    /*
+     * Checking hfile format is a time-consuming operation, we should have an option to skip this
+     * step when bulkloading millions of HFiles. See HBASE-13985.
+     */
+    boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
+    if (!validateHFile) {
+      LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
+          "are not correct. If you fail to read data from your table after using this " +
+          "option, consider removing the files and bulkload again without this option. " +
+          "See HBASE-13985");
+    }
+    // LQI queue does not need to be threadsafe -- all operations on this queue
+    // happen in this thread
+    Deque<LoadQueueItem> queue = new ArrayDeque<>();
+    ExecutorService pool = null;
+    SecureBulkLoadClient secureClient = null;
+    try {
+      prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
+
+      if (queue.isEmpty()) {
+        LOG.warn(
+          "Bulk load operation did not find any files to load in " + "directory " + hfofDir != null
+              ? hfofDir.toUri()
+              : "" + ".  Does it contain files in " +
+                  "subdirectories that correspond to column family names?");
+        return Collections.emptyMap();
+      }
+      pool = createExecutorService();
+      secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
+      return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
+    } finally {
+      cleanup(admin, queue, pool, secureClient);
+    }
+  }
+
+  /**
+   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+   * <ol>
+   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+   * </li>
+   * </ol>
+   * @param table Table to which these hfiles should be loaded to
+   * @param conn Connection to use
+   * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+   * @param startEndKeys starting and ending row keys of the region
+   */
+  public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
+      Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+    loadHFileQueue(table, conn, queue, startEndKeys, false);
+  }
+
+  /**
+   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+   * <ol>
+   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+   * </li>
+   * </ol>
+   * @param table Table to which these hfiles should be loaded to
+   * @param conn Connection to use
+   * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+   * @param startEndKeys starting and ending row keys of the region
+   */
+  public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
+      Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
+    ExecutorService pool = null;
+    try {
+      pool = createExecutorService();
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups =
+          groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
+      bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
+    } finally {
+      if (pool != null) {
+        pool.shutdown();
+      }
+    }
+  }
+
+  private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table,
+      RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
+      SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
+    int count = 0;
+
+    if (isSecureBulkLoadEndpointAvailable()) {
+      LOG.warn("SecureBulkLoadEndpoint is deprecated. It will be removed in future releases.");
+      LOG.warn("Secure bulk load has been integrated into HBase core.");
+    }
+
+    fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
+    bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
+    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
+
+    Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
+    // Assumes that region splits can happen while this occurs.
+    while (!queue.isEmpty()) {
+      // need to reload split keys each iteration.
+      final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
+      if (count != 0) {
+        LOG.info("Split occurred while grouping HFiles, retry attempt " + +count + " with " +
+            queue.size() + " files remaining to group or split");
+      }
+
+      int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+      maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
+      if (maxRetries != 0 && count >= maxRetries) {
+        throw new IOException(
+            "Retry attempted " + count + " times without completing, bailing out");
+      }
+      count++;
+
+      // Using ByteBuffer for byte[] equality semantics
+      pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
+
+      if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
+        // Error is logged inside checkHFilesCountPerRegionPerFamily.
+        throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily +
+            " hfiles to one family of one region");
+      }
+
+      bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
+        item2RegionMap);
+
+      // NOTE: The next iteration's split / group could happen in parallel to
+      // atomic bulkloads assuming that there are splits and no merges, and
+      // that we can atomically pull out the groups we want to retry.
+    }
+
+    if (!queue.isEmpty()) {
+      throw new RuntimeException("Bulk load aborted with some files not yet loaded." +
+          "Please check log for more details.");
+    }
+    return item2RegionMap;
+  }
+
+  /**
+   * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
+   * re-queued for another pass with the groupOrSplitPhase.
+   * <p>
+   * protected for testing.
+   */
+  @VisibleForTesting
+  protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool,
+      Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+      boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+    // atomically bulk load the groups.
+    Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
+    for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap()
+        .entrySet()) {
+      byte[] first = e.getKey().array();
+      Collection<LoadQueueItem> lqis = e.getValue();
+
+      ClientServiceCallable<byte[]> serviceCallable =
+          buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile);
+
+      Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
+        @Override
+        public List<LoadQueueItem> call() throws Exception {
+          List<LoadQueueItem> toRetry =
+              tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis);
+          return toRetry;
+        }
+      };
+      if (item2RegionMap != null) {
+        for (LoadQueueItem lqi : lqis) {
+          item2RegionMap.put(lqi, e.getKey());
+        }
+      }
+      loadingFutures.add(pool.submit(call));
+    }
+
+    // get all the results.
+    for (Future<List<LoadQueueItem>> future : loadingFutures) {
+      try {
+        List<LoadQueueItem> toRetry = future.get();
+
+        if (item2RegionMap != null) {
+          for (LoadQueueItem lqi : toRetry) {
+            item2RegionMap.remove(lqi);
+          }
+        }
+        // LQIs that are requeued to be regrouped.
+        queue.addAll(toRetry);
+
+      } catch (ExecutionException e1) {
+        Throwable t = e1.getCause();
+        if (t instanceof IOException) {
+          // At this point something unrecoverable has happened.
+          // TODO Implement bulk load recovery
+          throw new IOException("BulkLoad encountered an unrecoverable problem", t);
+        }
+        LOG.error("Unexpected execution exception during bulk load", e1);
+        throw new IllegalStateException(t);
+      } catch (InterruptedException e1) {
+        LOG.error("Unexpected interrupted exception during bulk load", e1);
+        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn,
+      TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
+    List<Pair<byte[], String>> famPaths =
+        lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
+            .collect(Collectors.toList());
+    return new ClientServiceCallable<byte[]>(conn, tableName, first,
+        rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
+      @Override
+      protected byte[] rpcCall() throws Exception {
+        SecureBulkLoadClient secureClient = null;
+        boolean success = false;
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Going to connect to server " + getLocation() + " for row " +
+                Bytes.toStringBinary(getRow()) + " with hfile group " +
+                LoadIncrementalHFiles.this.toString(famPaths));
+          }
+          byte[] regionName = getLocation().getRegionInfo().getRegionName();
+          try (Table table = conn.getTable(getTableName())) {
+            secureClient = new SecureBulkLoadClient(getConf(), table);
+            success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
+              assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
+          }
+          return success ? regionName : null;
+        } finally {
+          // Best effort copying of files that might not have been imported
+          // from the staging directory back to original location
+          // in user directory
+          if (secureClient != null && !success) {
+            FileSystem targetFs = FileSystem.get(getConf());
+            FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf());
+            // Check to see if the source and target filesystems are the same
+            // If they are the same filesystem, we will try move the files back
+            // because previously we moved them to the staging directory.
+            if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) {
+              for (Pair<byte[], String> el : famPaths) {
+                Path hfileStagingPath = null;
+                Path hfileOrigPath = new Path(el.getSecond());
+                try {
+                  hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
+                      hfileOrigPath.getName());
+                  if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
+                    LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath);
+                  } else if (targetFs.exists(hfileStagingPath)) {
+                    LOG.debug(
+                      "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath);
+                  }
+                } catch (Exception ex) {
+                  LOG.debug(
+                    "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex);
+                }
+              }
+            }
+          }
+        }
+      }
+    };
+  }
+
+  private boolean checkHFilesCountPerRegionPerFamily(
+      final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
+    for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
+      Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      for (LoadQueueItem lqi : e.getValue()) {
+        MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
+        count.increment();
+        if (count.intValue() > maxFilesPerRegionPerFamily) {
+          LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily +
+              " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) +
+              " of region with start key " + Bytes.toStringBinary(e.getKey()));
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @param table the table to load into
+   * @param pool the ExecutorService
+   * @param queue the queue for LoadQueueItem
+   * @param startEndKeys start and end keys
+   * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
+   */
+  private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
+      final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
+      final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+    // <region start key, LQI> need synchronized only within this scope of this
+    // phase because of the puts that happen in futures.
+    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
+    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
+    Set<String> missingHFiles = new HashSet<>();
+    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
+        new Pair<>(regionGroups, missingHFiles);
+
+    // drain LQIs and figure out bulk load groups
+    Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
+    while (!queue.isEmpty()) {
+      final LoadQueueItem item = queue.remove();
+
+      final Callable<Pair<List<LoadQueueItem>, String>> call =
+          new Callable<Pair<List<LoadQueueItem>, String>>() {
+            @Override
+            public Pair<List<LoadQueueItem>, String> call() throws Exception {
+              Pair<List<LoadQueueItem>, String> splits =
+                  groupOrSplit(regionGroups, item, table, startEndKeys);
+              return splits;
+            }
+          };
+      splittingFutures.add(pool.submit(call));
+    }
+    // get all the results. All grouping and splitting must finish before
+    // we can attempt the atomic loads.
+    for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
+      try {
+        Pair<List<LoadQueueItem>, String> splits = lqis.get();
+        if (splits != null) {
+          if (splits.getFirst() != null) {
+            queue.addAll(splits.getFirst());
+          } else {
+            missingHFiles.add(splits.getSecond());
+          }
+        }
+      } catch (ExecutionException e1) {
+        Throwable t = e1.getCause();
+        if (t instanceof IOException) {
+          LOG.error("IOException during splitting", e1);
+          throw (IOException) t; // would have been thrown if not parallelized,
+        }
+        LOG.error("Unexpected execution exception during splitting", e1);
+        throw new IllegalStateException(t);
+      } catch (InterruptedException e1) {
+        LOG.error("Unexpected interrupted exception during splitting", e1);
+        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
+      }
+    }
+    return pair;
+  }
+
+  private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table,
+      byte[] startKey, byte[] splitKey) throws IOException {
+    Path hfilePath = item.getFilePath();
+    byte[] family = item.getFamily();
+    Path tmpDir = hfilePath.getParent();
+    if (!tmpDir.getName().equals(TMP_DIR)) {
+      tmpDir = new Path(tmpDir, TMP_DIR);
+    }
+
+    LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
+
+    String uniqueName = getUniqueName();
+    ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family);
+
+    Path botOut = new Path(tmpDir, uniqueName + ".bottom");
+    Path topOut = new Path(tmpDir, uniqueName + ".top");
+    splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
+
+    FileSystem fs = tmpDir.getFileSystem(getConf());
+    fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
+    fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
+    fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
+
+    // Add these back at the *front* of the queue, so there's a lower
+    // chance that the region will just split again before we get there.
+    List<LoadQueueItem> lqis = new ArrayList<>(2);
+    lqis.add(new LoadQueueItem(family, botOut));
+    lqis.add(new LoadQueueItem(family, topOut));
+
+    // If the current item is already the result of previous splits,
+    // we don't need it anymore. Clean up to save space.
+    // It is not part of the original input files.
+    try {
+      if (tmpDir.getName().equals(TMP_DIR)) {
+        fs.delete(hfilePath, false);
+      }
+    } catch (IOException e) {
+      LOG.warn("Unable to delete temporary split file " + hfilePath);
+    }
+    LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+    return lqis;
+  }
+
+  /**
+   * Attempt to assign the given load queue item into its target region group. If the hfile boundary
+   * no longer fits into a region, physically splits the hfile such that the new bottom half will
+   * fit and returns the list of LQI's corresponding to the resultant hfiles.
+   * <p>
+   * protected for testing
+   * @throws IOException if an IO failure is encountered
+   */
+  @VisibleForTesting
+  protected Pair<List<LoadQueueItem>, String> groupOrSplit(
+      Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
+      final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+    Path hfilePath = item.getFilePath();
+    byte[] first, last;
+    try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
+      new CacheConfig(getConf()), true, getConf())) {
+      hfr.loadFileInfo();
+      first = hfr.getFirstRowKey();
+      last = hfr.getLastRowKey();
+    } catch (FileNotFoundException fnfe) {
+      LOG.debug("encountered", fnfe);
+      return new Pair<>(null, hfilePath.getName());
+    }
+
+    LOG.info("Trying to load hfile=" + hfilePath + " first=" + Bytes.toStringBinary(first) +
+        " last=" + Bytes.toStringBinary(last));
+    if (first == null || last == null) {
+      assert first == null && last == null;
+      // TODO what if this is due to a bad HFile?
+      LOG.info("hfile " + hfilePath + " has no entries, skipping");
+      return null;
+    }
+    if (Bytes.compareTo(first, last) > 0) {
+      throw new IllegalArgumentException(
+          "Invalid range: " + Bytes.toStringBinary(first) + " > " + Bytes.toStringBinary(last));
+    }
+    int idx = Arrays.binarySearch(startEndKeys.getFirst(), first, Bytes.BYTES_COMPARATOR);
+    if (idx < 0) {
+      // not on boundary, returns -(insertion index). Calculate region it
+      // would be in.
+      idx = -(idx + 1) - 1;
+    }
+    int indexForCallable = idx;
+
+    /**
+     * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
+     * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
+     * region. 3) if the endkey of the last region is not empty.
+     */
+    if (indexForCallable < 0) {
+      throw new IOException("The first region info for table " + table.getName() +
+          " can't be found in hbase:meta.Please use hbck tool to fix it first.");
+    } else if ((indexForCallable == startEndKeys.getFirst().length - 1) &&
+        !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
+      throw new IOException("The last region info for table " + table.getName() +
+          " can't be found in hbase:meta.Please use hbck tool to fix it first.");
+    } else if (indexForCallable + 1 < startEndKeys.getFirst().length &&
+        !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
+          startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
+      throw new IOException("The endkey of one region for table " + table.getName() +
+          " is not equal to the startkey of the next region in hbase:meta." +
+          "Please use hbck tool to fix it first.");
+    }
+
+    boolean lastKeyInRange = Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
+        Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
+    if (!lastKeyInRange) {
+      List<LoadQueueItem> lqis = splitStoreFile(item, table,
+        startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]);
+      return new Pair<>(lqis, null);
+    }
+
+    // group regions.
+    regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
+    return null;
+  }
+
+  /**
+   * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
+   * hfiles that need to be retried. If it is successful it will return an empty list.
+   * <p>
+   * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically
+   * and fails atomically.
+   * <p>
+   * Protected for testing.
+   * @return empty list if success, list of items to retry on recoverable failure
+   */
+  @VisibleForTesting
+  protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
+      final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
+      throws IOException {
+    try {
+      List<LoadQueueItem> toRetry = new ArrayList<>();
+      Configuration conf = getConf();
+      byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
+          .callWithRetries(serviceCallable, Integer.MAX_VALUE);
+      if (region == null) {
+        LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) +
+            " into table " + tableName + " with files " + lqis +
+            " failed.  This is recoverable and they will be retried.");
+        toRetry.addAll(lqis); // return lqi's to retry
+      }
+      // success
+      return toRetry;
+    } catch (IOException e) {
+      LOG.error("Encountered unrecoverable error from region server, additional details: " +
+          serviceCallable.getExceptionMessageAdditionalDetail(),
+        e);
+      throw e;
+    }
+  }
+
+  /**
+   * If the table is created for the first time, then "completebulkload" reads the files twice. More
+   * modifications necessary if we want to avoid doing it.
+   */
+  private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
+    final Path hfofDir = new Path(dirPath);
+    final FileSystem fs = hfofDir.getFileSystem(getConf());
+
+    // Add column families
+    // Build a set of keys
+    List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
+    SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
+      @Override
+      public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
+        ColumnFamilyDescriptorBuilder builder =
+            ColumnFamilyDescriptorBuilder.newBuilder(familyName);
+        familyBuilders.add(builder);
+        return builder;
+      }
+
+      @Override
+      public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
+          throws IOException {
+        Path hfile = hfileStatus.getPath();
+        try (HFile.Reader reader =
+            HFile.createReader(fs, hfile, new CacheConfig(getConf()), true, getConf())) {
+          if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
+            builder.setCompressionType(reader.getFileContext().getCompression());
+            LOG.info("Setting compression " + reader.getFileContext().getCompression().name() +
+                " for family " + builder.getNameAsString());
+          }
+          reader.loadFileInfo();
+          byte[] first = reader.getFirstRowKey();
+          byte[] last = reader.getLastRowKey();
+
+          LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
+              Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
+
+          // To eventually infer start key-end key boundaries
+          Integer value = map.containsKey(first) ? map.get(first) : 0;
+          map.put(first, value + 1);
+
+          value = map.containsKey(last) ? map.get(last) : 0;
+          map.put(last, value - 1);
+        }
+      }
+    });
+
+    byte[][] keys = inferBoundaries(map);
+    TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+    familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
+        .forEachOrdered(tdBuilder::addColumnFamily);
+    admin.createTable(tdBuilder.build(), keys);
+
+    LOG.info("Table " + tableName + " is available!!");
+  }
+
+  private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
+      SecureBulkLoadClient secureClient) throws IOException {
+    fsDelegationToken.releaseDelegationToken();
+    if (bulkToken != null && secureClient != null) {
+      secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
+    }
+    if (pool != null) {
+      pool.shutdown();
+    }
+    if (!queue.isEmpty()) {
+      StringBuilder err = new StringBuilder();
+      err.append("-------------------------------------------------\n");
+      err.append("Bulk load aborted with some files not yet loaded:\n");
+      err.append("-------------------------------------------------\n");
+      for (LoadQueueItem q : queue) {
+        err.append("  ").append(q.getFilePath()).append('\n');
+      }
+      LOG.error(err);
+    }
+  }
+
+  // unique file name for the table
+  private String getUniqueName() {
+    return UUID.randomUUID().toString().replaceAll("-", "");
+  }
+
+  /**
+   * Checks whether there is any invalid family name in HFiles to be bulk loaded.
+   */
+  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
+      throws IOException {
+    Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream()
+        .map(f -> f.getNameAsString()).collect(Collectors.toSet());
+    List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
+        .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
+    if (unmatchedFamilies.size() > 0) {
+      String msg =
+          "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " +
+              unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " +
+              familyNames;
+      LOG.error(msg);
+      if (!silence) {
+        throw new IOException(msg);
+      }
+    }
+  }
+
+  /**
+   * Populate the Queue with given HFiles
+   */
+  private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
+    map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
+  }
+
+  /**
+   * Walk the given directory for all HFiles, and return a Queue containing all such files.
+   */
+  private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
+      final boolean validateHFile) throws IOException {
+    visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() {
+      @Override
+      public byte[] bulkFamily(final byte[] familyName) {
+        return familyName;
+      }
+
+      @Override
+      public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
+        long length = hfile.getLen();
+        if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
+          HConstants.DEFAULT_MAX_FILE_SIZE)) {
+          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length +
+              " bytes can be problematic as it may lead to oversplitting.");
+        }
+        ret.add(new LoadQueueItem(family, hfile.getPath()));
+      }
+    }, validateHFile);
+  }
+
+  private interface BulkHFileVisitor<TFamily> {
+
+    TFamily bulkFamily(byte[] familyName) throws IOException;
+
+    void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
+  }
+
+  /**
+   * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and
+   * non-valid hfiles.
+   */
+  private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
+      final BulkHFileVisitor<TFamily> visitor) throws IOException {
+    visitBulkHFiles(fs, bulkDir, visitor, true);
+  }
+
+  /**
+   * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
+   * skip non-valid hfiles by default, or skip this validation by setting
+   * 'hbase.loadincremental.validate.hfile' to false.
+   */
+  private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
+      BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
+    FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
+    for (FileStatus familyStat : familyDirStatuses) {
+      if (!familyStat.isDirectory()) {
+        LOG.warn("Skipping non-directory " + familyStat.getPath());
+        continue;
+      }
+      Path familyDir = familyStat.getPath();
+      byte[] familyName = familyDir.getName().getBytes();
+      // Skip invalid family
+      try {
+        ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
+      } catch (IllegalArgumentException e) {
+        LOG.warn("Skipping invalid " + familyStat.getPath());
+        continue;
+      }
+      TFamily family = visitor.bulkFamily(familyName);
+
+      FileStatus[] hfileStatuses = fs.listStatus(familyDir);
+      for (FileStatus hfileStatus : hfileStatuses) {
+        if (!fs.isFile(hfileStatus.getPath())) {
+          LOG.warn("Skipping non-file " + hfileStatus);
+          continue;
+        }
+
+        Path hfile = hfileStatus.getPath();
+        // Skip "_", reference, HFileLink
+        String fileName = hfile.getName();
+        if (fileName.startsWith("_")) {
+          continue;
+        }
+        if (StoreFileInfo.isReference(fileName)) {
+          LOG.warn("Skipping reference " + fileName);
+          continue;
+        }
+        if (HFileLink.isHFileLink(fileName)) {
+          LOG.warn("Skipping HFileLink " + fileName);
+          continue;
+        }
+
+        // Validate HFile Format if needed
+        if (validateHFile) {
+          try {
+            if (!HFile.isHFileFormat(fs, hfile)) {
+              LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
+              continue;
+            }
+          } catch (FileNotFoundException e) {
+            LOG.warn("the file " + hfile + " was removed");
+            continue;
+          }
+        }
+
+        visitor.bulkHFile(family, hfileStatus);
+      }
+    }
+  }
+
+  // Initialize a thread pool
+  private ExecutorService createExecutorService() {
+    ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<>(),
+        new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build());
+    pool.allowCoreThreadTimeOut(true);
+    return pool;
+  }
+
+  private final String toString(List<Pair<byte[], String>> list) {
+    StringBuilder sb = new StringBuilder();
+    sb.append('[');
+    list.forEach(p -> {
+      sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond())
+          .append('}');
+    });
+    sb.append(']');
+    return sb.toString();
+  }
+
+  private boolean isSecureBulkLoadEndpointAvailable() {
+    String classes = getConf().get(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+    return classes.contains("org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint");
+  }
+
+  /**
+   * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
+   * filters, etc.
+   */
+  @VisibleForTesting
+  static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
+      byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
+    // Open reader with no block cache, and not in-memory
+    Reference topReference = Reference.createTopReference(splitKey);
+    Reference bottomReference = Reference.createBottomReference(splitKey);
+
+    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
+    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
+  }
+
+  /**
+   * Copy half of an HFile into a new HFile.
+   */
+  private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
+      Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
+    FileSystem fs = inFile.getFileSystem(conf);
+    CacheConfig cacheConf = new CacheConfig(conf);
+    HalfStoreFileReader halfReader = null;
+    StoreFileWriter halfWriter = null;
+    try {
+      halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
+          new AtomicInteger(0), true, conf);
+      Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
+
+      int blocksize = familyDescriptor.getBlocksize();
+      Algorithm compression = familyDescriptor.getCompressionType();
+      BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
+      HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+          .withChecksumType(HStore.getChecksumType(conf))
+          .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize)
+          .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
+          .build();
+      halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
+          .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+      HFileScanner scanner = halfReader.getScanner(false, false, false);
+      scanner.seekTo();
+      do {
+        halfWriter.append(scanner.getCell());
+      } while (scanner.next());
+
+      for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
+        if (shouldCopyHFileMetaKey(entry.getKey())) {
+          halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
+        }
+      }
+    } finally {
+      if (halfReader != null) {
+        try {
+          halfReader.close(cacheConf.shouldEvictOnClose());
+        } catch (IOException e) {
+          LOG.warn("failed to close hfile reader for " + inFile, e);
+        }
+      }
+      if (halfWriter != null) {
+        halfWriter.close();
+      }
+
+    }
+  }
+
+  private static boolean shouldCopyHFileMetaKey(byte[] key) {
+    // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
+    if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
+      return false;
+    }
+
+    return !HFile.isReservedFileInfoKey(key);
+  }
+
+  private boolean isCreateTable() {
+    return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
+  }
+
+  private boolean isSilence() {
+    return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
+  }
+
+  private boolean isAlwaysCopyFiles() {
+    return getConf().getBoolean(ALWAYS_COPY_FILES, false);
+  }
+
+  /**
+   * Perform bulk load on the given table.
+   * @param hfofDir the directory that was provided as the output path of a job using
+   *          HFileOutputFormat
+   * @param tableName the table to load into
+   */
+  public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
+      throws IOException {
+    try (Connection connection = ConnectionFactory.createConnection(getConf());
+        Admin admin = connection.getAdmin()) {
+      if (!admin.tableExists(tableName)) {
+        if (isCreateTable()) {
+          createTable(tableName, hfofDir, admin);
+        } else {
+          String errorMsg = format("Table '%s' does not exist.", tableName);
+          LOG.error(errorMsg);
+          throw new TableNotFoundException(errorMsg);
+        }
+      }
+      try (Table table = connection.getTable(tableName);
+          RegionLocator locator = connection.getRegionLocator(tableName)) {
+        return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(), isAlwaysCopyFiles());
+      }
+    }
+  }
+
+  /**
+   * Perform bulk load on the given table.
+   * @param family2Files map of family to List of hfiles
+   * @param tableName the table to load into
+   */
+  public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files,
+      TableName tableName) throws IOException {
+    try (Connection connection = ConnectionFactory.createConnection(getConf());
+        Admin admin = connection.getAdmin()) {
+      if (!admin.tableExists(tableName)) {
+        String errorMsg = format("Table '%s' does not exist.", tableName);
+        LOG.error(errorMsg);
+        throw new TableNotFoundException(errorMsg);
+      }
+      try (Table table = connection.getTable(tableName);
+          RegionLocator locator = connection.getRegionLocator(tableName)) {
+        return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles());
+      }
+    }
+  }
+
+  @Override
+  public int run(String[] args) throws Exception {
+    if (args.length < 2) {
+      usage();
+      return -1;
+    }
+    String dirPath = args[0];
+    TableName tableName = TableName.valueOf(args[1]);
+    return !run(dirPath, tableName).isEmpty() ? 0 : -1;
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args);
+    System.exit(ret);
+  }
+
+  /**
+   * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
+   * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
+   * property. This directory is used as a temporary directory where all files are initially
+   * copied/moved from user given directory, set all the required file permissions and then from
+   * their it is finally loaded into a table. This should be set only when, one would like to manage
+   * the staging directory by itself. Otherwise this tool will handle this by itself.
+   * @param stagingDir staging directory path
+   */
+  public void setBulkToken(String stagingDir) {
+    this.bulkToken = stagingDir;
+  }
+
+  /**
+   * Infers region boundaries for a new table.
+   * <p>
+   * Parameter: <br>
+   * bdryMap is a map between keys to an integer belonging to {+1, -1}
+   * <ul>
+   * <li>If a key is a start key of a file, then it maps to +1</li>
+   * <li>If a key is an end key of a file, then it maps to -1</li>
+   * </ul>
+   * <p>
+   * Algo:<br>
+   * <ol>
+   * <li>Poll on the keys in order:
+   * <ol type="a">
+   * <li>Keep adding the mapped values to these keys (runningSum)</li>
+   * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
+   * boundary list.</li>
+   * </ol>
+   * </li>
+   * <li>Return the boundary list.</li>
+   * </ol>
+   */
+  public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
+    List<byte[]> keysArray = new ArrayList<>();
+    int runningValue = 0;
+    byte[] currStartKey = null;
+    boolean firstBoundary = true;
+
+    for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
+      if (runningValue == 0) {
+        currStartKey = item.getKey();
+      }
+      runningValue += item.getValue();
+      if (runningValue == 0) {
+        if (!firstBoundary) {
+          keysArray.add(currStartKey);
+        }
+        firstBoundary = false;
+      }
+    }
+
+    return keysArray.toArray(new byte[0][]);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index bc663e1..886d0da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -3484,7 +3484,7 @@ public class HBaseFsck extends Configured implements Closeable {
       errors.print("This sidelined region dir should be bulk loaded: "
         + path.toString());
       errors.print("Bulk load command looks like: "
-        + "hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles "
+        + "hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles "
         + path.toUri().getPath() + " "+ tableName);
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index c4924bb..3b7f1f8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
 import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
@@ -72,9 +71,9 @@ import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -86,6 +85,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestName;
 
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+
 @Category({ CoprocessorTests.class, MediumTests.class })
 public class TestRegionObserverInterface {
   private static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
deleted file mode 100644
index b5b7a0c..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ /dev/null
@@ -1,763 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.HFileTestUtil;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-
-/**
- * Test cases for the "load" half of the HFileOutputFormat bulk load
- * functionality. These tests run faster than the full MR cluster
- * tests in TestHFileOutputFormat
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestLoadIncrementalHFiles {
-  @Rule
-  public TestName tn = new TestName();
-
-  private static final byte[] QUALIFIER = Bytes.toBytes("myqual");
-  private static final byte[] FAMILY = Bytes.toBytes("myfam");
-  private static final String NAMESPACE = "bulkNS";
-
-  static final String EXPECTED_MSG_FOR_NON_EXISTING_FAMILY = "Unmatched family names found";
-  static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
-
-  private static final byte[][] SPLIT_KEYS = new byte[][] {
-    Bytes.toBytes("ddd"),
-    Bytes.toBytes("ppp")
-  };
-
-  static HBaseTestingUtility util = new HBaseTestingUtility();
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,"");
-    util.getConfiguration().setInt(
-      LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
-      MAX_FILES_PER_REGION_PER_FAMILY);
-    // change default behavior so that tag values are returned with normal rpcs
-    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
-        KeyValueCodecWithTags.class.getCanonicalName());
-    util.startMiniCluster();
-
-    setupNamespace();
-  }
-
-  protected static void setupNamespace() throws Exception {
-    util.getAdmin().createNamespace(NamespaceDescriptor.create(NAMESPACE).build());
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    util.shutdownMiniCluster();
-  }
-
-  @Test(timeout = 120000)
-  public void testSimpleLoadWithMap() throws Exception {
-    runTest("testSimpleLoadWithMap", BloomType.NONE,
-        new byte[][][] {
-          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
-          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
-    },  true);
-  }
-
-  /**
-   * Test case that creates some regions and loads
-   * HFiles that fit snugly inside those regions
-   */
-  @Test(timeout = 120000)
-  public void testSimpleLoad() throws Exception {
-    runTest("testSimpleLoad", BloomType.NONE,
-        new byte[][][] {
-          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
-          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
-    });
-  }
-
-  @Test(timeout = 120000)
-  public void testSimpleLoadWithFileCopy() throws Exception {
-    String testName = tn.getMethodName();
-    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
-    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
-        false, null, new byte[][][] {
-          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
-          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
-    }, false, true);
-  }
-
-  /**
-   * Test case that creates some regions and loads
-   * HFiles that cross the boundaries of those regions
-   */
-  @Test(timeout = 120000)
-  public void testRegionCrossingLoad() throws Exception {
-    runTest("testRegionCrossingLoad", BloomType.NONE,
-        new byte[][][] {
-          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
-          new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
-    });
-  }
-
-  /**
-   * Test loading into a column family that has a ROW bloom filter.
-   */
-  @Test(timeout = 60000)
-  public void testRegionCrossingRowBloom() throws Exception {
-    runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
-        new byte[][][] {
-          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
-          new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
-    });
-  }
-
-  /**
-   * Test loading into a column family that has a ROWCOL bloom filter.
-   */
-  @Test(timeout = 120000)
-  public void testRegionCrossingRowColBloom() throws Exception {
-    runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
-        new byte[][][] {
-          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
-          new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
-    });
-  }
-
-  /**
-   * Test case that creates some regions and loads HFiles that have
-   * different region boundaries than the table pre-split.
-   */
-  @Test(timeout = 120000)
-  public void testSimpleHFileSplit() throws Exception {
-    runTest("testHFileSplit", BloomType.NONE,
-        new byte[][] {
-          Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
-          Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
-        },
-        new byte[][][] {
-          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
-          new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") },
-        }
-    );
-  }
-
-  /**
-   * Test case that creates some regions and loads HFiles that cross the boundaries
-   * and have different region boundaries than the table pre-split.
-   */
-  @Test(timeout = 60000)
-  public void testRegionCrossingHFileSplit() throws Exception {
-    testRegionCrossingHFileSplit(BloomType.NONE);
-  }
-
-  /**
-   * Test case that creates some regions and loads HFiles that cross the boundaries
-   * have a ROW bloom filter and a different region boundaries than the table pre-split.
-   */
-  @Test(timeout = 120000)
-  public void testRegionCrossingHFileSplitRowBloom() throws Exception {
-    testRegionCrossingHFileSplit(BloomType.ROW);
-  }
-
-  /**
-   * Test case that creates some regions and loads HFiles that cross the boundaries
-   * have a ROWCOL bloom filter and a different region boundaries than the table pre-split.
-   */
-  @Test(timeout = 120000)
-  public void testRegionCrossingHFileSplitRowColBloom() throws Exception {
-    testRegionCrossingHFileSplit(BloomType.ROWCOL);
-  }
-
-  @Test
-  public void testSplitALot() throws Exception {
-    runTest("testSplitALot", BloomType.NONE,
-      new byte[][] {
-        Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"),
-        Bytes.toBytes("ccc"), Bytes.toBytes("ddd"),
-        Bytes.toBytes("eee"), Bytes.toBytes("fff"),
-        Bytes.toBytes("ggg"), Bytes.toBytes("hhh"),
-        Bytes.toBytes("iii"), Bytes.toBytes("lll"),
-        Bytes.toBytes("mmm"), Bytes.toBytes("nnn"),
-        Bytes.toBytes("ooo"), Bytes.toBytes("ppp"),
-        Bytes.toBytes("qqq"), Bytes.toBytes("rrr"),
-        Bytes.toBytes("sss"), Bytes.toBytes("ttt"),
-        Bytes.toBytes("uuu"), Bytes.toBytes("vvv"),
-        Bytes.toBytes("zzz"),
-      },
-      new byte[][][] {
-        new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") },
-      }
-    );
-  }
-
-  private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
-    runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
-        new byte[][] {
-          Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
-          Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
-        },
-        new byte[][][] {
-          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
-          new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
-        }
-    );
-  }
-
-  private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
-    HTableDescriptor htd = new HTableDescriptor(tableName);
-    HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
-    familyDesc.setBloomFilterType(bloomType);
-    htd.addFamily(familyDesc);
-    return htd;
-  }
-
-  private void runTest(String testName, BloomType bloomType,
-      byte[][][] hfileRanges) throws Exception {
-    runTest(testName, bloomType, null, hfileRanges);
-  }
-
-  private void runTest(String testName, BloomType bloomType,
-      byte[][][] hfileRanges, boolean useMap) throws Exception {
-    runTest(testName, bloomType, null, hfileRanges, useMap);
-  }
-
-  private void runTest(String testName, BloomType bloomType,
-      byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
-    runTest(testName, bloomType, tableSplitKeys, hfileRanges, false);
-  }
-
-  private void runTest(String testName, BloomType bloomType,
-      byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap) throws Exception {
-    final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
-    final boolean preCreateTable = tableSplitKeys != null;
-
-    // Run the test bulkloading the table to the default namespace
-    final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
-    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
-        useMap);
-
-    // Run the test bulkloading the table to the specified namespace
-    final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
-    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
-        useMap);
-  }
-
-  private void runTest(String testName, TableName tableName, BloomType bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
-          throws Exception {
-    HTableDescriptor htd = buildHTD(tableName, bloomType);
-    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
-  }
-
-  public static int loadHFiles(String testName, HTableDescriptor htd, HBaseTestingUtility util,
-      byte[] fam, byte[] qual, boolean preCreateTable, byte[][] tableSplitKeys,
-      byte[][][] hfileRanges, boolean useMap, boolean deleteFile,
-      boolean copyFiles, int initRowCount, int factor) throws Exception {
-    Path dir = util.getDataTestDirOnTestFS(testName);
-    FileSystem fs = util.getTestFileSystem();
-    dir = dir.makeQualified(fs);
-    Path familyDir = new Path(dir, Bytes.toString(fam));
-
-    int hfileIdx = 0;
-    Map<byte[], List<Path>> map = null;
-    List<Path> list = null;
-    if (useMap || copyFiles) {
-      list = new ArrayList<>();
-    }
-    if (useMap) {
-      map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-      map.put(fam, list);
-    }
-    Path last = null;
-    for (byte[][] range : hfileRanges) {
-      byte[] from = range[0];
-      byte[] to = range[1];
-      Path path = new Path(familyDir, "hfile_" + hfileIdx++);
-      HFileTestUtil.createHFile(util.getConfiguration(), fs, path, fam, qual, from, to, factor);
-      if (useMap) {
-        last = path;
-        list.add(path);
-      }
-    }
-    int expectedRows = hfileIdx * factor;
-
-    final TableName tableName = htd.getTableName();
-    if (!util.getHBaseAdmin().tableExists(tableName) && (preCreateTable || map != null)) {
-      util.getAdmin().createTable(htd, tableSplitKeys);
-    }
-
-    Configuration conf = util.getConfiguration();
-    if (copyFiles) {
-      conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
-    }
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
-    String [] args= {dir.toString(), tableName.toString()};
-    if (useMap) {
-      if (deleteFile) fs.delete(last);
-      Map<LoadQueueItem, ByteBuffer> loaded = loader.run(null, map, tableName);
-      if (deleteFile) {
-        expectedRows -= 1000;
-        for (LoadQueueItem item : loaded.keySet()) {
-          if (item.hfilePath.getName().equals(last.getName())) {
-            fail(last + " should be missing");
-          }
-        }
-      }
-    } else {
-      loader.run(args);
-    }
-
-    if (copyFiles) {
-      for (Path p : list) {
-        assertTrue(p + " should exist", fs.exists(p));
-      }
-    }
-
-    Table table = util.getConnection().getTable(tableName);
-    try {
-      assertEquals(initRowCount + expectedRows, util.countRows(table));
-    } finally {
-      table.close();
-    }
-
-    return expectedRows;
-  }
-
-  private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
-      boolean copyFiles) throws Exception {
-    loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys,
-        hfileRanges, useMap, true, copyFiles, 0, 1000);
-
-    final TableName tableName = htd.getTableName();
-    // verify staging folder has been cleaned up
-    Path stagingBasePath = new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
-    FileSystem fs = util.getTestFileSystem();
-    if(fs.exists(stagingBasePath)) {
-      FileStatus[] files = fs.listStatus(stagingBasePath);
-      for(FileStatus file : files) {
-        assertTrue("Folder=" + file.getPath() + " is not cleaned up.",
-          file.getPath().getName() != "DONOTERASE");
-      }
-    }
-
-    util.deleteTable(tableName);
-  }
-
-  /**
-   * Test that tags survive through a bulk load that needs to split hfiles.
-   *
-   * This test depends on the "hbase.client.rpc.codec" =  KeyValueCodecWithTags so that the client
-   * can get tags in the responses.
-   */
-  @Test(timeout = 60000)
-  public void testTagsSurviveBulkLoadSplit() throws Exception {
-    Path dir = util.getDataTestDirOnTestFS(tn.getMethodName());
-    FileSystem fs = util.getTestFileSystem();
-    dir = dir.makeQualified(fs);
-    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
-    // table has these split points
-    byte [][] tableSplitKeys = new byte[][] {
-            Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
-            Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"),
-    };
-
-    // creating an hfile that has values that span the split points.
-    byte[] from = Bytes.toBytes("ddd");
-    byte[] to = Bytes.toBytes("ooo");
-    HFileTestUtil.createHFileWithTags(util.getConfiguration(), fs,
-        new Path(familyDir, tn.getMethodName()+"_hfile"),
-        FAMILY, QUALIFIER, from, to, 1000);
-    int expectedRows = 1000;
-
-    TableName tableName = TableName.valueOf(tn.getMethodName());
-    HTableDescriptor htd = buildHTD(tableName, BloomType.NONE);
-    util.getAdmin().createTable(htd, tableSplitKeys);
-
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
-    String [] args= {dir.toString(), tableName.toString()};
-    loader.run(args);
-
-    Table table = util.getConnection().getTable(tableName);
-    try {
-      assertEquals(expectedRows, util.countRows(table));
-      HFileTestUtil.verifyTags(table);
-    } finally {
-      table.close();
-    }
-
-    util.deleteTable(tableName);
-  }
-
-  /**
-   * Test loading into a column family that does not exist.
-   */
-  @Test(timeout = 60000)
-  public void testNonexistentColumnFamilyLoad() throws Exception {
-    String testName = tn.getMethodName();
-    byte[][][] hFileRanges = new byte[][][] {
-      new byte[][]{ Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
-      new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
-    };
-
-    final byte[] TABLE = Bytes.toBytes("mytable_"+testName);
-    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
-    // set real family name to upper case in purpose to simulate the case that
-    // family name in HFiles is invalid
-    HColumnDescriptor family =
-        new HColumnDescriptor(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT)));
-    htd.addFamily(family);
-
-    try {
-      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
-      assertTrue("Loading into table with non-existent family should have failed", false);
-    } catch (Exception e) {
-      assertTrue("IOException expected", e instanceof IOException);
-      // further check whether the exception message is correct
-      String errMsg = e.getMessage();
-      assertTrue("Incorrect exception message, expected message: ["
-          + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY + "], current message: [" + errMsg + "]",
-          errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
-    }
-  }
-
-  @Test(timeout = 120000)
-  public void testNonHfileFolderWithUnmatchedFamilyName() throws Exception {
-    testNonHfileFolder("testNonHfileFolderWithUnmatchedFamilyName", true);
-  }
-
-  @Test(timeout = 120000)
-  public void testNonHfileFolder() throws Exception {
-    testNonHfileFolder("testNonHfileFolder", false);
-  }
-
-  /**
-   * Write a random data file and a non-file in a dir with a valid family name
-   * but not part of the table families. we should we able to bulkload without
-   * getting the unmatched family exception. HBASE-13037/HBASE-13227
-   */
-  private void testNonHfileFolder(String tableName, boolean preCreateTable) throws Exception {
-    Path dir = util.getDataTestDirOnTestFS(tableName);
-    FileSystem fs = util.getTestFileSystem();
-    dir = dir.makeQualified(fs);
-
-    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
-    HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_0"),
-        FAMILY, QUALIFIER, Bytes.toBytes("begin"), Bytes.toBytes("end"), 500);
-    createRandomDataFile(fs, new Path(familyDir, "012356789"), 16 * 1024);
-
-    final String NON_FAMILY_FOLDER = "_logs";
-    Path nonFamilyDir = new Path(dir, NON_FAMILY_FOLDER);
-    fs.mkdirs(nonFamilyDir);
-    fs.mkdirs(new Path(nonFamilyDir, "non-file"));
-    createRandomDataFile(fs, new Path(nonFamilyDir, "012356789"), 16 * 1024);
-
-    Table table = null;
-    try {
-      if (preCreateTable) {
-        table = util.createTable(TableName.valueOf(tableName), FAMILY);
-      } else {
-        table = util.getConnection().getTable(TableName.valueOf(tableName));
-      }
-
-      final String[] args = {dir.toString(), tableName};
-      new LoadIncrementalHFiles(util.getConfiguration()).run(args);
-      assertEquals(500, util.countRows(table));
-    } finally {
-      if (table != null) {
-        table.close();
-      }
-      fs.delete(dir, true);
-    }
-  }
-
-  private static void createRandomDataFile(FileSystem fs, Path path, int size)
-      throws IOException {
-    FSDataOutputStream stream = fs.create(path);
-    try {
-      byte[] data = new byte[1024];
-      for (int i = 0; i < data.length; ++i) {
-        data[i] = (byte)(i & 0xff);
-      }
-      while (size >= data.length) {
-        stream.write(data, 0, data.length);
-        size -= data.length;
-      }
-      if (size > 0) {
-        stream.write(data, 0, size);
-      }
-    } finally {
-      stream.close();
-    }
-  }
-
-  @Test(timeout = 120000)
-  public void testSplitStoreFile() throws IOException {
-    Path dir = util.getDataTestDirOnTestFS("testSplitHFile");
-    FileSystem fs = util.getTestFileSystem();
-    Path testIn = new Path(dir, "testhfile");
-    HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
-    HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
-        Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
-
-    Path bottomOut = new Path(dir, "bottom.out");
-    Path topOut = new Path(dir, "top.out");
-
-    LoadIncrementalHFiles.splitStoreFile(
-        util.getConfiguration(), testIn,
-        familyDesc, Bytes.toBytes("ggg"),
-        bottomOut,
-        topOut);
-
-    int rowCount = verifyHFile(bottomOut);
-    rowCount += verifyHFile(topOut);
-    assertEquals(1000, rowCount);
-  }
-
-  @Test
-  public void testSplitStoreFileWithNoneToNone() throws IOException {
-    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.NONE);
-  }
-
-  @Test
-  public void testSplitStoreFileWithEncodedToEncoded() throws IOException {
-    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.DIFF);
-  }
-
-  @Test
-  public void testSplitStoreFileWithEncodedToNone() throws IOException {
-    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.DIFF, DataBlockEncoding.NONE);
-  }
-
-  @Test
-  public void testSplitStoreFileWithNoneToEncoded() throws IOException {
-    testSplitStoreFileWithDifferentEncoding(DataBlockEncoding.NONE, DataBlockEncoding.DIFF);
-  }
-
-  private void testSplitStoreFileWithDifferentEncoding(DataBlockEncoding bulkloadEncoding,
-      DataBlockEncoding cfEncoding) throws IOException {
-    Path dir = util.getDataTestDirOnTestFS("testSplitHFileWithDifferentEncoding");
-    FileSystem fs = util.getTestFileSystem();
-    Path testIn = new Path(dir, "testhfile");
-    HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
-    familyDesc.setDataBlockEncoding(cfEncoding);
-    HFileTestUtil.createHFileWithDataBlockEncoding(
-        util.getConfiguration(), fs, testIn, bulkloadEncoding,
-        FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
-
-    Path bottomOut = new Path(dir, "bottom.out");
-    Path topOut = new Path(dir, "top.out");
-
-    LoadIncrementalHFiles.splitStoreFile(
-        util.getConfiguration(), testIn,
-        familyDesc, Bytes.toBytes("ggg"),
-        bottomOut,
-        topOut);
-
-    int rowCount = verifyHFile(bottomOut);
-    rowCount += verifyHFile(topOut);
-    assertEquals(1000, rowCount);
-  }
-
-  private int verifyHFile(Path p) throws IOException {
-    Configuration conf = util.getConfiguration();
-    HFile.Reader reader =
-        HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
-    reader.loadFileInfo();
-    HFileScanner scanner = reader.getScanner(false, false);
-    scanner.seekTo();
-    int count = 0;
-    do {
-      count++;
-    } while (scanner.next());
-    assertTrue(count > 0);
-    reader.close();
-    return count;
-  }
-
-  private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
-    Integer value = map.containsKey(first)?map.get(first):0;
-    map.put(first, value+1);
-
-    value = map.containsKey(last)?map.get(last):0;
-    map.put(last, value-1);
-  }
-
-  @Test(timeout = 120000)
-  public void testInferBoundaries() {
-    TreeMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-
-    /* Toy example
-     *     c---------i            o------p          s---------t     v------x
-     * a------e    g-----k   m-------------q   r----s            u----w
-     *
-     * Should be inferred as:
-     * a-----------------k   m-------------q   r--------------t  u---------x
-     *
-     * The output should be (m,r,u)
-     */
-
-    String first;
-    String last;
-
-    first = "a"; last = "e";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    first = "r"; last = "s";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    first = "o"; last = "p";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    first = "g"; last = "k";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    first = "v"; last = "x";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    first = "c"; last = "i";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    first = "m"; last = "q";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    first = "s"; last = "t";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    first = "u"; last = "w";
-    addStartEndKeysForTest(map, first.getBytes(), last.getBytes());
-
-    byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
-    byte[][] compare = new byte[3][];
-    compare[0] = "m".getBytes();
-    compare[1] = "r".getBytes();
-    compare[2] = "u".getBytes();
-
-    assertEquals(keysArray.length, 3);
-
-    for (int row = 0; row<keysArray.length; row++){
-      assertArrayEquals(keysArray[row], compare[row]);
-    }
-  }
-
-  @Test(timeout = 60000)
-  public void testLoadTooMayHFiles() throws Exception {
-    Path dir = util.getDataTestDirOnTestFS("testLoadTooMayHFiles");
-    FileSystem fs = util.getTestFileSystem();
-    dir = dir.makeQualified(fs);
-    Path familyDir = new Path(dir, Bytes.toString(FAMILY));
-
-    byte[] from = Bytes.toBytes("begin");
-    byte[] to = Bytes.toBytes("end");
-    for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
-      HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
-          + i), FAMILY, QUALIFIER, from, to, 1000);
-    }
-
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
-    String [] args= {dir.toString(), "mytable_testLoadTooMayHFiles"};
-    try {
-      loader.run(args);
-      fail("Bulk loading too many files should fail");
-    } catch (IOException ie) {
-      assertTrue(ie.getMessage().contains("Trying to load more than "
-        + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
-    }
-  }
-
-  @Test(expected = TableNotFoundException.class)
-  public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
-    Configuration conf = util.getConfiguration();
-    conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
-    String[] args = { "directory", "nonExistingTable" };
-    loader.run(args);
-  }
-
-  @Test(timeout = 120000)
-  public void testTableWithCFNameStartWithUnderScore() throws Exception {
-    Path dir = util.getDataTestDirOnTestFS("cfNameStartWithUnderScore");
-    FileSystem fs = util.getTestFileSystem();
-    dir = dir.makeQualified(fs.getUri(), fs.getWorkingDirectory());
-    String family = "_cf";
-    Path familyDir = new Path(dir, family);
-
-    byte[] from = Bytes.toBytes("begin");
-    byte[] to = Bytes.toBytes("end");
-    Configuration conf = util.getConfiguration();
-    String tableName = tn.getMethodName();
-    Table table = util.createTable(TableName.valueOf(tableName), family);
-    HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
-      QUALIFIER, from, to, 1000);
-
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
-    String[] args = { dir.toString(), tableName };
-    try {
-      loader.run(args);
-      assertEquals(1000, util.countRows(table));
-    } finally {
-      if (null != table) {
-        table.close();
-      }
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
index 07dd2a9..5dce4ad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -40,9 +40,9 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.Assert;


[4/4] hbase git commit: HBASE-18699 Copy LoadIncrementalHFiles to another package and mark the old one as deprecated

Posted by zh...@apache.org.
HBASE-18699 Copy LoadIncrementalHFiles to another package and mark the old one as deprecated


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9e53f292
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9e53f292
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9e53f292

Branch: refs/heads/master
Commit: 9e53f2927b3154eb703560933ddad489c2e232b5
Parents: 7c51d3f
Author: zhangduo <zh...@apache.org>
Authored: Fri Sep 1 20:27:16 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Sep 3 19:49:42 2017 +0800

----------------------------------------------------------------------
 .../hbase/backup/impl/RestoreTablesClient.java  |    6 +-
 .../backup/mapreduce/MapReduceRestoreJob.java   |    2 +-
 .../hadoop/hbase/backup/util/BackupUtils.java   |    2 +-
 .../hadoop/hbase/backup/util/RestoreTool.java   |    2 +-
 .../hadoop/hbase/backup/TestBackupBase.java     |    2 +-
 .../TestIncrementalBackupWithBulkLoad.java      |    5 +-
 .../client/ColumnFamilyDescriptorBuilder.java   |   10 +-
 .../hbase/coprocessor/TestSecureExport.java     |    2 +-
 ...ReplicationSyncUpToolWithBulkLoadedData.java |    2 +-
 .../mapreduce/IntegrationTestBulkLoad.java      |   23 +-
 .../mapreduce/IntegrationTestImportTsv.java     |    5 +-
 .../hadoop/hbase/mapreduce/CopyTable.java       |    1 +
 .../apache/hadoop/hbase/mapreduce/Driver.java   |    1 +
 .../hbase/mapreduce/HRegionPartitioner.java     |    2 +-
 ...opSecurityEnabledUserProviderForTesting.java |   41 -
 .../hbase/mapreduce/TestHFileOutputFormat2.java |    6 +-
 .../TestLoadIncrementalHFilesSplitRecovery.java |  669 ---------
 .../TestSecureLoadIncrementalHFiles.java        |   70 -
 ...ecureLoadIncrementalHFilesSplitRecovery.java |   69 -
 .../snapshot/TestMobSecureExportSnapshot.java   |    2 +-
 .../snapshot/TestSecureExportSnapshot.java      |    2 +-
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 1284 +-----------------
 .../compactions/PartitionedMobCompactor.java    |    2 +-
 .../regionserver/HFileReplicator.java           |    4 +-
 .../hbase/tool/LoadIncrementalHFiles.java       | 1251 +++++++++++++++++
 .../org/apache/hadoop/hbase/util/HBaseFsck.java |    2 +-
 .../TestRegionObserverInterface.java            |    5 +-
 .../mapreduce/TestLoadIncrementalHFiles.java    |  763 -----------
 .../regionserver/TestScannerWithBulkload.java   |    2 +-
 .../replication/TestMasterReplication.java      |    2 +-
 ...opSecurityEnabledUserProviderForTesting.java |   41 +
 .../security/access/TestAccessController.java   |   19 +-
 .../hadoop/hbase/tool/MapreduceTestingShim.java |  171 +++
 .../hbase/tool/TestLoadIncrementalHFiles.java   |  723 ++++++++++
 .../TestLoadIncrementalHFilesSplitRecovery.java |  628 +++++++++
 .../tool/TestSecureLoadIncrementalHFiles.java   |   66 +
 ...ecureLoadIncrementalHFilesSplitRecovery.java |   66 +
 .../spark/IntegrationTestSparkBulkLoad.java     |    2 +-
 .../hbasecontext/JavaHBaseBulkLoadExample.java  |    2 +-
 .../hbase/spark/TestJavaHBaseContext.java       |    2 +-
 .../hadoop/hbase/spark/BulkLoadSuite.scala      |    2 +-
 src/main/asciidoc/_chapters/ops_mgt.adoc        |    2 +-
 42 files changed, 3041 insertions(+), 2922 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
index ea7a7b8..ff79533 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/RestoreTablesClient.java
@@ -44,8 +44,8 @@ import org.apache.hadoop.hbase.backup.util.RestoreTool;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
 
 /**
  * Restore table implementation
@@ -231,7 +231,7 @@ public class RestoreTablesClient {
         LoadIncrementalHFiles loader = BackupUtils.createLoader(conf);
         for (int i = 0; i < sTableList.size(); i++) {
           if (mapForSrc[i] != null && !mapForSrc[i].isEmpty()) {
-            loaderResult = loader.run(null, mapForSrc[i], tTableArray[i]);
+            loaderResult = loader.run(mapForSrc[i], tTableArray[i]);
             LOG.debug("bulk loading " + sTableList.get(i) + " to " + tTableArray[i]);
             if (loaderResult.isEmpty()) {
               String msg = "Couldn't bulk load for " + sTableList.get(i) + " to " + tTableArray[i];

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 1209e7c..93ea2e5 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
 import org.apache.hadoop.hbase.backup.RestoreJob;
 import org.apache.hadoop.hbase.backup.util.BackupUtils;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.util.Tool;
 
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index 11a1a3d..74bfb6c 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.backup.impl.BackupManifest.BackupImage;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FSTableDescriptors;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index 2e311cf..ab56aaa 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.io.HFileLink;
 import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
index 7fe9a61..8752ca2 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupBase.java
@@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.SecureTestUtil;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
index 769785f..f63bf29 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.mapreduce.TestLoadIncrementalHFiles;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.TestLoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.Assert;
@@ -46,8 +47,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
-
 /**
  * 1. Create table t1
  * 2. Load data to t1

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
index d25f9d1..784a250 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptorBuilder.java
@@ -420,6 +420,10 @@ public class ColumnFamilyDescriptorBuilder {
     return this;
   }
 
+  public String getNameAsString() {
+    return desc.getNameAsString();
+  }
+
   public ColumnFamilyDescriptorBuilder setBlockCacheEnabled(boolean value) {
     desc.setBlockCacheEnabled(value);
     return this;
@@ -470,6 +474,10 @@ public class ColumnFamilyDescriptorBuilder {
     return this;
   }
 
+  public Compression.Algorithm getCompressionType() {
+    return desc.getCompressionType();
+  }
+
   public ColumnFamilyDescriptorBuilder setConfiguration(final String key, final String value) {
     desc.setConfiguration(key, value);
     return this;
@@ -610,7 +618,7 @@ public class ColumnFamilyDescriptorBuilder {
      */
     @InterfaceAudience.Private
     public ModifyableColumnFamilyDescriptor(final byte[] name) {
-      this(isLegalColumnFamilyName(name), getDefaultValuesBytes(), Collections.EMPTY_MAP);
+      this(isLegalColumnFamilyName(name), getDefaultValuesBytes(), Collections.emptyMap());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
index 66d99dd..e4cd54d 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestSecureExport.java
@@ -48,10 +48,10 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.http.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.hbase.mapreduce.ExportUtils;
-import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
 import org.apache.hadoop.hbase.mapreduce.Import;
 import org.apache.hadoop.hbase.protobuf.generated.VisibilityLabelsProtos;
 import org.apache.hadoop.hbase.security.HBaseKerberosUtils;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.AccessControlConstants;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
----------------------------------------------------------------------
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index 75f8ee2..0b33d20 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
index 52f1223..cd84163 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
@@ -20,9 +20,16 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static org.junit.Assert.assertEquals;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
 
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.Log;
@@ -53,6 +60,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.RegionSplitter;
@@ -77,15 +85,8 @@ import org.apache.hadoop.util.ToolRunner;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.hadoop.hbase.shaded.com.google.common.base.Joiner;
+import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
 
 /**
  * Test Bulk Load and MR on a distributed cluster.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
index fb7acf4..246cb5b 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
@@ -32,21 +32,22 @@ import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.IntegrationTestingUtility;
-import org.apache.hadoop.hbase.testclassification.IntegrationTests;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
 import org.apache.hadoop.util.Tool;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index 9cccf8c..513beb4 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.Tool;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
index 1c69e77..dc5214e 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
 import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.util.ProgramDriver;
 
 /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
index 3c3060b..5a8ead2 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
@@ -42,7 +42,7 @@ import org.apache.hadoop.mapreduce.Partitioner;
  *
  * <p>This class is not suitable as partitioner creating hfiles
  * for incremental bulk loads as region spread will likely change between time of
- * hfile creation and load time. See {@link LoadIncrementalHFiles}
+ * hfile creation and load time. See {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles}
  * and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p>
  *
  * @param <KEY>  The type of the key.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java
deleted file mode 100644
index b342f64..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/HadoopSecurityEnabledUserProviderForTesting.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.security.UserProvider;
-
-/**
- * A {@link UserProvider} that always says hadoop security is enabled, regardless of the underlying
- * configuration. HBase security is <i>not enabled</i> as this is used to determine if SASL is used
- * to do the authentication, which requires a Kerberos ticket (which we currently don't have in
- * tests).
- * <p>
- * This should only be used for <b>TESTING</b>.
- */
-public class HadoopSecurityEnabledUserProviderForTesting extends UserProvider {
-
-  @Override
-  public boolean isHBaseSecurityEnabled() {
-    return false;
-  }
-
-  @Override
-  public boolean isHadoopSecurityEnabled() {
-    return true;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index c6a8761..cbff2de 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -36,6 +36,8 @@ import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -88,6 +90,7 @@ import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
+import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -109,9 +112,6 @@ import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 import org.mockito.Mockito;
 
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
 /**
  * Simple test for {@link HFileOutputFormat2}.
  * Sets up and runs a mapreduce job that writes hfile output.

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
deleted file mode 100644
index 529a448..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ /dev/null
@@ -1,669 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.mockito.Mockito;
-
-import org.apache.hadoop.hbase.shaded.com.google.common.collect.Multimap;
-
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
-
-/**
- * Test cases for the atomic load error handling of the bulk load functionality.
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestLoadIncrementalHFilesSplitRecovery {
-  private static final Log LOG = LogFactory.getLog(TestHRegionServerBulkLoad.class);
-
-  static HBaseTestingUtility util;
-  //used by secure subclass
-  static boolean useSecure = false;
-
-  final static int NUM_CFS = 10;
-  final static byte[] QUAL = Bytes.toBytes("qual");
-  final static int ROWCOUNT = 100;
-
-  private final static byte[][] families = new byte[NUM_CFS][];
-
-  @Rule
-  public TestName name = new TestName();
-
-  static {
-    for (int i = 0; i < NUM_CFS; i++) {
-      families[i] = Bytes.toBytes(family(i));
-    }
-  }
-
-  static byte[] rowkey(int i) {
-    return Bytes.toBytes(String.format("row_%08d", i));
-  }
-
-  static String family(int i) {
-    return String.format("family_%04d", i);
-  }
-
-  static byte[] value(int i) {
-    return Bytes.toBytes(String.format("%010d", i));
-  }
-
-  public static void buildHFiles(FileSystem fs, Path dir, int value)
-      throws IOException {
-    byte[] val = value(value);
-    for (int i = 0; i < NUM_CFS; i++) {
-      Path testIn = new Path(dir, family(i));
-
-      TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
-          Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
-    }
-  }
-
-  /**
-   * Creates a table with given table name and specified number of column
-   * families if the table does not already exist.
-   */
-  private void setupTable(final Connection connection, TableName table, int cfs)
-  throws IOException {
-    try {
-      LOG.info("Creating table " + table);
-      HTableDescriptor htd = new HTableDescriptor(table);
-      for (int i = 0; i < cfs; i++) {
-        htd.addFamily(new HColumnDescriptor(family(i)));
-      }
-      try (Admin admin = connection.getAdmin()) {
-        admin.createTable(htd);
-      }
-    } catch (TableExistsException tee) {
-      LOG.info("Table " + table + " already exists");
-    }
-  }
-
-  /**
-   * Creates a table with given table name,specified number of column families<br>
-   * and splitkeys if the table does not already exist.
-   * @param table
-   * @param cfs
-   * @param SPLIT_KEYS
-   */
-  private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
-      throws IOException {
-    try {
-      LOG.info("Creating table " + table);
-      HTableDescriptor htd = new HTableDescriptor(table);
-      for (int i = 0; i < cfs; i++) {
-        htd.addFamily(new HColumnDescriptor(family(i)));
-      }
-
-      util.createTable(htd, SPLIT_KEYS);
-    } catch (TableExistsException tee) {
-      LOG.info("Table " + table + " already exists");
-    }
-  }
-
-  private Path buildBulkFiles(TableName table, int value) throws Exception {
-    Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
-    Path bulk1 = new Path(dir, table.getNameAsString() + value);
-    FileSystem fs = util.getTestFileSystem();
-    buildHFiles(fs, bulk1, value);
-    return bulk1;
-  }
-
-  /**
-   * Populate table with known values.
-   */
-  private void populateTable(final Connection connection, TableName table, int value)
-  throws Exception {
-    // create HFiles for different column families
-    LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
-    Path bulk1 = buildBulkFiles(table, value);
-    try (Table t = connection.getTable(table);
-        RegionLocator locator = connection.getRegionLocator(table);
-        Admin admin = connection.getAdmin()) {
-        lih.doBulkLoad(bulk1, admin, t, locator);
-    }
-  }
-
-  /**
-   * Split the known table in half.  (this is hard coded for this test suite)
-   */
-  private void forceSplit(TableName table) {
-    try {
-      // need to call regions server to by synchronous but isn't visible.
-      HRegionServer hrs = util.getRSForFirstRegionInTable(table);
-
-      for (HRegionInfo hri :
-          ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
-        if (hri.getTable().equals(table)) {
-          util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
-          //ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
-        }
-      }
-
-      // verify that split completed.
-      int regions;
-      do {
-        regions = 0;
-        for (HRegionInfo hri :
-            ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
-          if (hri.getTable().equals(table)) {
-            regions++;
-          }
-        }
-        if (regions != 2) {
-          LOG.info("Taking some time to complete split...");
-          Thread.sleep(250);
-        }
-      } while (regions != 2);
-    } catch (IOException e) {
-      e.printStackTrace();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
-  }
-
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    util = new HBaseTestingUtility();
-    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
-    util.startMiniCluster(1);
-  }
-
-  @AfterClass
-  public static void teardownCluster() throws Exception {
-    util.shutdownMiniCluster();
-  }
-
-  /**
-   * Checks that all columns have the expected value and that there is the
-   * expected number of rows.
-   * @throws IOException
-   */
-  void assertExpectedTable(TableName table, int count, int value) throws IOException {
-    HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString());
-    assertEquals(htds.length, 1);
-    Table t = null;
-    try {
-      t = util.getConnection().getTable(table);
-      Scan s = new Scan();
-      ResultScanner sr = t.getScanner(s);
-      int i = 0;
-      for (Result r : sr) {
-        i++;
-        for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
-          for (byte[] val : nm.values()) {
-            assertTrue(Bytes.equals(val, value(value)));
-          }
-        }
-      }
-      assertEquals(count, i);
-    } catch (IOException e) {
-      fail("Failed due to exception");
-    } finally {
-      if (t != null) t.close();
-    }
-  }
-
-  /**
-   * Test that shows that exception thrown from the RS side will result in an
-   * exception on the LIHFile client.
-   */
-  @Test(expected=IOException.class, timeout=120000)
-  public void testBulkLoadPhaseFailure() throws Exception {
-    final TableName table = TableName.valueOf(name.getMethodName());
-    final AtomicInteger attmptedCalls = new AtomicInteger();
-    final AtomicInteger failedCalls = new AtomicInteger();
-    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
-    try (Connection connection = ConnectionFactory.createConnection(util
-        .getConfiguration())) {
-      setupTable(connection, table, 10);
-      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
-          util.getConfiguration()) {
-        @Override
-        protected List<LoadQueueItem> tryAtomicRegionLoad(
-            ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
-            Collection<LoadQueueItem> lqis) throws IOException {
-          int i = attmptedCalls.incrementAndGet();
-          if (i == 1) {
-            Connection errConn;
-            try {
-              errConn = getMockedConnection(util.getConfiguration());
-              serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
-            } catch (Exception e) {
-              LOG.fatal("mocking cruft, should never happen", e);
-              throw new RuntimeException("mocking cruft, should never happen");
-            }
-            failedCalls.incrementAndGet();
-            return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
-          }
-
-          return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
-        }
-      };
-      try {
-        // create HFiles for different column families
-        Path dir = buildBulkFiles(table, 1);
-        try (Table t = connection.getTable(table);
-            RegionLocator locator = connection.getRegionLocator(table);
-            Admin admin = connection.getAdmin()) {
-          lih.doBulkLoad(dir, admin, t, locator);
-        }
-      } finally {
-        util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
-      }
-      fail("doBulkLoad should have thrown an exception");
-    }
-  }
-
-  /**
-   * Test that shows that exception thrown from the RS side will result in the
-   * expected number of retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER}
-   * when ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
-   */
-  @Test
-  public void testRetryOnIOException() throws Exception {
-    final TableName table = TableName.valueOf(name.getMethodName());
-    final AtomicInteger calls = new AtomicInteger(1);
-    final Connection conn = ConnectionFactory.createConnection(util
-        .getConfiguration());
-    util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
-    util.getConfiguration().setBoolean(
-        LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
-    final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
-        util.getConfiguration()) {
-      @Override
-      protected List<LoadQueueItem> tryAtomicRegionLoad(
-          ClientServiceCallable<byte[]> serverCallable, TableName tableName,
-          final byte[] first, Collection<LoadQueueItem> lqis)
-          throws IOException {
-        if (calls.getAndIncrement() < util.getConfiguration().getInt(
-            HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-            HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) - 1) {
-          ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(
-              conn, tableName, first, new RpcControllerFactory(
-                  util.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
-            @Override
-            public byte[] rpcCall() throws Exception {
-              throw new IOException("Error calling something on RegionServer");
-            }
-          };
-          return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
-        } else {
-          return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
-        }
-      }
-    };
-    setupTable(conn, table, 10);
-    Path dir = buildBulkFiles(table, 1);
-    lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table),
-        conn.getRegionLocator(table));
-    util.getConfiguration().setBoolean(
-        LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
-
-  }
-
-  @SuppressWarnings("deprecation")
-  private ClusterConnection getMockedConnection(final Configuration conf)
-  throws IOException, org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException {
-    ClusterConnection c = Mockito.mock(ClusterConnection.class);
-    Mockito.when(c.getConfiguration()).thenReturn(conf);
-    Mockito.doNothing().when(c).close();
-    // Make it so we return a particular location when asked.
-    final HRegionLocation loc = new HRegionLocation(HRegionInfo.FIRST_META_REGIONINFO,
-        ServerName.valueOf("example.org", 1234, 0));
-    Mockito.when(c.getRegionLocation((TableName) Mockito.any(),
-        (byte[]) Mockito.any(), Mockito.anyBoolean())).
-      thenReturn(loc);
-    Mockito.when(c.locateRegion((TableName) Mockito.any(), (byte[]) Mockito.any())).
-      thenReturn(loc);
-    ClientProtos.ClientService.BlockingInterface hri =
-      Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
-    Mockito.when(hri.bulkLoadHFile((RpcController)Mockito.any(), (BulkLoadHFileRequest)Mockito.any())).
-      thenThrow(new ServiceException(new IOException("injecting bulk load error")));
-    Mockito.when(c.getClient(Mockito.any(ServerName.class))).
-      thenReturn(hri);
-    return c;
-  }
-
-  /**
-   * This test exercises the path where there is a split after initial
-   * validation but before the atomic bulk load call. We cannot use presplitting
-   * to test this path, so we actually inject a split just before the atomic
-   * region load.
-   */
-  @Test (timeout=120000)
-  public void testSplitWhileBulkLoadPhase() throws Exception {
-    final TableName table = TableName.valueOf(name.getMethodName());
-    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
-      setupTable(connection, table, 10);
-      populateTable(connection, table,1);
-      assertExpectedTable(table, ROWCOUNT, 1);
-
-      // Now let's cause trouble.  This will occur after checks and cause bulk
-      // files to fail when attempt to atomically import.  This is recoverable.
-      final AtomicInteger attemptedCalls = new AtomicInteger();
-      LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
-        @Override
-        protected void bulkLoadPhase(final Table htable, final Connection conn,
-            ExecutorService pool, Deque<LoadQueueItem> queue,
-            final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
-            Map<LoadQueueItem, ByteBuffer> item2RegionMap)
-                throws IOException {
-          int i = attemptedCalls.incrementAndGet();
-          if (i == 1) {
-            // On first attempt force a split.
-            forceSplit(table);
-          }
-          super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
-        }
-      };
-
-      // create HFiles for different column families
-      try (Table t = connection.getTable(table);
-          RegionLocator locator = connection.getRegionLocator(table);
-          Admin admin = connection.getAdmin()) {
-        Path bulk = buildBulkFiles(table, 2);
-        lih2.doBulkLoad(bulk, admin, t, locator);
-      }
-
-      // check that data was loaded
-      // The three expected attempts are 1) failure because need to split, 2)
-      // load of split top 3) load of split bottom
-      assertEquals(attemptedCalls.get(), 3);
-      assertExpectedTable(table, ROWCOUNT, 2);
-    }
-  }
-
-  /**
-   * This test splits a table and attempts to bulk load.  The bulk import files
-   * should be split before atomically importing.
-   */
-  @Test (timeout=120000)
-  public void testGroupOrSplitPresplit() throws Exception {
-    final TableName table = TableName.valueOf(name.getMethodName());
-    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
-      setupTable(connection, table, 10);
-      populateTable(connection, table, 1);
-      assertExpectedTable(connection, table, ROWCOUNT, 1);
-      forceSplit(table);
-
-      final AtomicInteger countedLqis= new AtomicInteger();
-      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
-          util.getConfiguration()) {
-        @Override
-        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
-            Multimap<ByteBuffer, LoadQueueItem> regionGroups,
-            final LoadQueueItem item, final Table htable,
-            final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
-          Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
-              startEndKeys);
-          if (lqis != null && lqis.getFirst() != null) {
-            countedLqis.addAndGet(lqis.getFirst().size());
-          }
-          return lqis;
-        }
-      };
-
-      // create HFiles for different column families
-      Path bulk = buildBulkFiles(table, 2);
-      try (Table t = connection.getTable(table);
-          RegionLocator locator = connection.getRegionLocator(table);
-          Admin admin = connection.getAdmin()) {
-        lih.doBulkLoad(bulk, admin, t, locator);
-      }
-      assertExpectedTable(connection, table, ROWCOUNT, 2);
-      assertEquals(20, countedLqis.get());
-    }
-  }
-
-  /**
-   * This test creates a table with many small regions.  The bulk load files
-   * would be splitted multiple times before all of them can be loaded successfully.
-   */
-  @Test (timeout=120000)
-  public void testSplitTmpFileCleanUp() throws Exception {
-    final TableName table = TableName.valueOf(name.getMethodName());
-    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
-        Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"),
-        Bytes.toBytes("row_00000040"), Bytes.toBytes("row_00000050")};
-    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
-      setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
-
-      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
-
-      // create HFiles
-      Path bulk = buildBulkFiles(table, 2);
-      try (Table t = connection.getTable(table);
-          RegionLocator locator = connection.getRegionLocator(table);
-          Admin admin = connection.getAdmin()) {
-        lih.doBulkLoad(bulk, admin, t, locator);
-      }
-      // family path
-      Path tmpPath = new Path(bulk, family(0));
-      // TMP_DIR under family path
-      tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
-      FileSystem fs = bulk.getFileSystem(util.getConfiguration());
-      // HFiles have been splitted, there is TMP_DIR
-      assertTrue(fs.exists(tmpPath));
-      // TMP_DIR should have been cleaned-up
-      assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
-        FSUtils.listStatus(fs, tmpPath));
-      assertExpectedTable(connection, table, ROWCOUNT, 2);
-    }
-  }
-
-  /**
-   * This simulates an remote exception which should cause LIHF to exit with an
-   * exception.
-   */
-  @Test(expected = IOException.class, timeout=120000)
-  public void testGroupOrSplitFailure() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
-      setupTable(connection, tableName, 10);
-
-      LoadIncrementalHFiles lih = new LoadIncrementalHFiles(
-          util.getConfiguration()) {
-        int i = 0;
-
-        @Override
-        protected Pair<List<LoadQueueItem>, String> groupOrSplit(
-            Multimap<ByteBuffer, LoadQueueItem> regionGroups,
-            final LoadQueueItem item, final Table table,
-            final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
-          i++;
-
-          if (i == 5) {
-            throw new IOException("failure");
-          }
-          return super.groupOrSplit(regionGroups, item, table, startEndKeys);
-        }
-      };
-
-      // create HFiles for different column families
-      Path dir = buildBulkFiles(tableName,1);
-      try (Table t = connection.getTable(tableName);
-          RegionLocator locator = connection.getRegionLocator(tableName);
-          Admin admin = connection.getAdmin()) {
-        lih.doBulkLoad(dir, admin, t, locator);
-      }
-    }
-
-    fail("doBulkLoad should have thrown an exception");
-  }
-
-  @Test (timeout=120000)
-  public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
-    final TableName tableName = TableName.valueOf(name.getMethodName());
-    byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
-    // Share connection. We were failing to find the table with our new reverse scan because it
-    // looks for first region, not any region -- that is how it works now.  The below removes first
-    // region in test.  Was reliant on the Connection caching having first region.
-    Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
-    Table table = connection.getTable(tableName);
-
-    setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
-    Path dir = buildBulkFiles(tableName, 2);
-
-    final AtomicInteger countedLqis = new AtomicInteger();
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
-
-      @Override
-      protected Pair<List<LoadQueueItem>, String> groupOrSplit(
-          Multimap<ByteBuffer, LoadQueueItem> regionGroups,
-          final LoadQueueItem item, final Table htable,
-          final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
-        Pair<List<LoadQueueItem>, String> lqis = super.groupOrSplit(regionGroups, item, htable,
-            startEndKeys);
-        if (lqis != null && lqis.getFirst() != null) {
-          countedLqis.addAndGet(lqis.getFirst().size());
-        }
-        return lqis;
-      }
-    };
-
-    // do bulkload when there is no region hole in hbase:meta.
-    try (Table t = connection.getTable(tableName);
-        RegionLocator locator = connection.getRegionLocator(tableName);
-        Admin admin = connection.getAdmin()) {
-      loader.doBulkLoad(dir, admin, t, locator);
-    } catch (Exception e) {
-      LOG.error("exeception=", e);
-    }
-    // check if all the data are loaded into the table.
-    this.assertExpectedTable(tableName, ROWCOUNT, 2);
-
-    dir = buildBulkFiles(tableName, 3);
-
-    // Mess it up by leaving a hole in the hbase:meta
-    List<HRegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
-    for (HRegionInfo regionInfo : regionInfos) {
-      if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
-        MetaTableAccessor.deleteRegion(connection, regionInfo);
-        break;
-      }
-    }
-
-    try (Table t = connection.getTable(tableName);
-        RegionLocator locator = connection.getRegionLocator(tableName);
-        Admin admin = connection.getAdmin()) {
-      loader.doBulkLoad(dir, admin, t, locator);
-    } catch (Exception e) {
-      LOG.error("exception=", e);
-      assertTrue("IOException expected", e instanceof IOException);
-    }
-
-    table.close();
-
-    // Make sure at least the one region that still exists can be found.
-    regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
-    assertTrue(regionInfos.size() >= 1);
-
-    this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
-    connection.close();
-  }
-
-  /**
-   * Checks that all columns have the expected value and that there is the
-   * expected number of rows.
-   * @throws IOException
-   */
-  void assertExpectedTable(final Connection connection, TableName table, int count, int value)
-  throws IOException {
-    HTableDescriptor [] htds = util.getAdmin().listTables(table.getNameAsString());
-    assertEquals(htds.length, 1);
-    Table t = null;
-    try {
-      t = connection.getTable(table);
-      Scan s = new Scan();
-      ResultScanner sr = t.getScanner(s);
-      int i = 0;
-      for (Result r : sr) {
-        i++;
-        for (NavigableMap<byte[], byte[]> nm : r.getNoVersionMap().values()) {
-          for (byte[] val : nm.values()) {
-            assertTrue(Bytes.equals(val, value(value)));
-          }
-        }
-      }
-      assertEquals(count, i);
-    } catch (IOException e) {
-      fail("Failed due to exception");
-    } finally {
-      if (t != null) t.close();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
deleted file mode 100644
index 78fddbc..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFiles.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/**
- * Copyright The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.access.AccessControlLists;
-import org.apache.hadoop.hbase.security.access.SecureTestUtil;
-
-import org.junit.BeforeClass;
-import org.junit.experimental.categories.Category;
-
-/**
- * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode.
- * This suite is unable to verify the security handoff/turnover
- * as miniCluster is running as system user thus has root privileges
- * and delegation tokens don't seem to work on miniDFS.
- *
- * Thus SecureBulkload can only be completely verified by running
- * integration tests against a secure cluster. This suite is still
- * invaluable as it verifies the other mechanisms that need to be
- * supported as part of a LoadIncrementalFiles call.
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestSecureLoadIncrementalHFiles extends  TestLoadIncrementalHFiles{
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-    // set the always on security provider
-    UserProvider.setUserProviderForTesting(util.getConfiguration(),
-      HadoopSecurityEnabledUserProviderForTesting.class);
-    // setup configuration
-    SecureTestUtil.enableSecurity(util.getConfiguration());
-    util.getConfiguration().setInt(
-        LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
-        MAX_FILES_PER_REGION_PER_FAMILY);
-    // change default behavior so that tag values are returned with normal rpcs
-    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
-        KeyValueCodecWithTags.class.getCanonicalName());
-
-    util.startMiniCluster();
-
-    // Wait for the ACL table to become available
-    util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
-
-    setupNamespace();
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
deleted file mode 100644
index 0e877ad..0000000
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestSecureLoadIncrementalHFilesSplitRecovery.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MapReduceTests;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.access.AccessControlLists;
-import org.apache.hadoop.hbase.security.access.SecureTestUtil;
-
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-
-/**
- * Reruns TestSecureLoadIncrementalHFilesSplitRecovery
- * using LoadIncrementalHFiles in secure mode.
- * This suite is unable to verify the security handoff/turnove
- * as miniCluster is running as system user thus has root privileges
- * and delegation tokens don't seem to work on miniDFS.
- *
- * Thus SecureBulkload can only be completely verified by running
- * integration tests against a secure cluster. This suite is still
- * invaluable as it verifies the other mechanisms that need to be
- * supported as part of a LoadIncrementalFiles call.
- */
-@Category({MapReduceTests.class, LargeTests.class})
-public class TestSecureLoadIncrementalHFilesSplitRecovery extends TestLoadIncrementalHFilesSplitRecovery {
-
-  //This "overrides" the parent static method
-  //make sure they are in sync
-  @BeforeClass
-  public static void setupCluster() throws Exception {
-    util = new HBaseTestingUtility();
-    // set the always on security provider
-    UserProvider.setUserProviderForTesting(util.getConfiguration(),
-      HadoopSecurityEnabledUserProviderForTesting.class);
-    // setup configuration
-    SecureTestUtil.enableSecurity(util.getConfiguration());
-
-    util.startMiniCluster();
-
-    // Wait for the ACL table to become available
-    util.waitTableEnabled(AccessControlLists.ACL_TABLE_NAME);
-  }
-
-  //Disabling this test as it does not work in secure mode
-  @Test (timeout=180000)
-  @Override
-  public void testBulkLoadPhaseFailure() {
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
index 98d03c0..df9f4ff 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestMobSecureExportSnapshot.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hbase.snapshot;
 
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
-import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
 import org.apache.hadoop.hbase.security.access.SecureTestUtil;

http://git-wip-us.apache.org/repos/asf/hbase/blob/9e53f292/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
index 7d4832c..def0838 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/snapshot/TestSecureExportSnapshot.java
@@ -20,7 +20,7 @@
 package org.apache.hadoop.hbase.snapshot;
 
 import org.apache.hadoop.hbase.CategoryBasedTimeout;
-import org.apache.hadoop.hbase.mapreduce.HadoopSecurityEnabledUserProviderForTesting;
+import org.apache.hadoop.hbase.security.HadoopSecurityEnabledUserProviderForTesting;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
 import org.apache.hadoop.hbase.security.access.SecureTestUtil;