You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/27 01:56:44 UTC

hbase git commit: HBASE-16672 Add option for bulk load to copy hfile(s) instead of renaming

Repository: hbase
Updated Branches:
  refs/heads/master b9ec59ebb -> 219c78645


HBASE-16672 Add option for bulk load to copy hfile(s) instead of renaming


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/219c7864
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/219c7864
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/219c7864

Branch: refs/heads/master
Commit: 219c7864575be3ab0a1f67ddd48c5852377c2ed2
Parents: b9ec59e
Author: tedyu <yu...@gmail.com>
Authored: Mon Sep 26 18:56:38 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Mon Sep 26 18:56:38 2016 -0700

----------------------------------------------------------------------
 .../hbase/client/SecureBulkLoadClient.java      |  23 +-
 .../hadoop/hbase/protobuf/RequestConverter.java |  22 ++
 .../hbase/protobuf/generated/ClientProtos.java  | 234 +++++++++++++------
 hbase-protocol/src/main/protobuf/Client.proto   |   1 +
 .../hbase/mapreduce/LoadIncrementalHFiles.java  |  53 +++--
 .../hadoop/hbase/regionserver/HRegion.java      |   8 +-
 .../hbase/regionserver/RSRpcServices.java       |   3 +-
 .../hadoop/hbase/regionserver/Region.java       |  19 +-
 .../regionserver/SecureBulkLoadManager.java     |   8 +-
 .../mapreduce/TestLoadIncrementalHFiles.java    |  35 ++-
 .../TestLoadIncrementalHFilesSplitRecovery.java |  13 +-
 11 files changed, 313 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index eddf8f1..5af8034 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -113,9 +113,30 @@ public class SecureBulkLoadClient {
       final List<Pair<byte[], String>> familyPaths,
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) throws IOException {
+    return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
+        false);
+  }
+
+  /**
+   * Securely bulk load a list of HFiles using client protocol.
+   *
+   * @param client
+   * @param familyPaths
+   * @param regionName
+   * @param assignSeqNum
+   * @param userToken
+   * @param bulkToken
+   * @param copyFiles
+   * @return true if all are loaded
+   * @throws IOException
+   */
+  public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
+      final List<Pair<byte[], String>> familyPaths,
+      final byte[] regionName, boolean assignSeqNum,
+      final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
     BulkLoadHFileRequest request =
         RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
-          userToken, bulkToken);
+          userToken, bulkToken, copyFiles);
 
     try {
       BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index b75d2b8..860e0e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -522,12 +522,33 @@ public final class RequestConverter {
    * @param familyPaths
    * @param regionName
    * @param assignSeqNum
+   * @param userToken
+   * @param bulkToken
    * @return a bulk load request
    */
   public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
       final List<Pair<byte[], String>> familyPaths,
       final byte[] regionName, boolean assignSeqNum,
       final Token<?> userToken, final String bulkToken) {
+    return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
+        false);
+  }
+
+  /**
+   * Create a protocol buffer bulk load request
+   *
+   * @param familyPaths
+   * @param regionName
+   * @param assignSeqNum
+   * @param userToken
+   * @param bulkToken
+   * @param copyFiles
+   * @return a bulk load request
+   */
+  public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
+      final List<Pair<byte[], String>> familyPaths,
+      final byte[] regionName, boolean assignSeqNum,
+      final Token<?> userToken, final String bulkToken, boolean copyFiles) {
     RegionSpecifier region = RequestConverter.buildRegionSpecifier(
       RegionSpecifierType.REGION_NAME, regionName);
 
@@ -560,6 +581,7 @@ public final class RequestConverter {
     if (bulkToken != null) {
       request.setBulkToken(bulkToken);
     }
+    request.setCopyFile(copyFiles);
     return request.build();
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 4e9e0b4..f0141df 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -21077,6 +21077,16 @@ public final class ClientProtos {
      */
     com.google.protobuf.ByteString
         getBulkTokenBytes();
+
+    // optional bool copy_file = 6 [default = false];
+    /**
+     * <code>optional bool copy_file = 6 [default = false];</code>
+     */
+    boolean hasCopyFile();
+    /**
+     * <code>optional bool copy_file = 6 [default = false];</code>
+     */
+    boolean getCopyFile();
   }
   /**
    * Protobuf type {@code hbase.pb.BulkLoadHFileRequest}
@@ -21179,6 +21189,11 @@ public final class ClientProtos {
               bulkToken_ = input.readBytes();
               break;
             }
+            case 48: {
+              bitField0_ |= 0x00000010;
+              copyFile_ = input.readBool();
+              break;
+            }
           }
         }
       } catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -21979,12 +21994,29 @@ public final class ClientProtos {
       }
     }
 
+    // optional bool copy_file = 6 [default = false];
+    public static final int COPY_FILE_FIELD_NUMBER = 6;
+    private boolean copyFile_;
+    /**
+     * <code>optional bool copy_file = 6 [default = false];</code>
+     */
+    public boolean hasCopyFile() {
+      return ((bitField0_ & 0x00000010) == 0x00000010);
+    }
+    /**
+     * <code>optional bool copy_file = 6 [default = false];</code>
+     */
+    public boolean getCopyFile() {
+      return copyFile_;
+    }
+
     private void initFields() {
       region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
       familyPath_ = java.util.Collections.emptyList();
       assignSeqNum_ = false;
       fsToken_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken.getDefaultInstance();
       bulkToken_ = "";
+      copyFile_ = false;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -22027,6 +22059,9 @@ public final class ClientProtos {
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         output.writeBytes(5, getBulkTokenBytes());
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        output.writeBool(6, copyFile_);
+      }
       getUnknownFields().writeTo(output);
     }
 
@@ -22056,6 +22091,10 @@ public final class ClientProtos {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(5, getBulkTokenBytes());
       }
+      if (((bitField0_ & 0x00000010) == 0x00000010)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBoolSize(6, copyFile_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -22101,6 +22140,11 @@ public final class ClientProtos {
         result = result && getBulkToken()
             .equals(other.getBulkToken());
       }
+      result = result && (hasCopyFile() == other.hasCopyFile());
+      if (hasCopyFile()) {
+        result = result && (getCopyFile()
+            == other.getCopyFile());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -22134,6 +22178,10 @@ public final class ClientProtos {
         hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER;
         hash = (53 * hash) + getBulkToken().hashCode();
       }
+      if (hasCopyFile()) {
+        hash = (37 * hash) + COPY_FILE_FIELD_NUMBER;
+        hash = (53 * hash) + hashBoolean(getCopyFile());
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       memoizedHashCode = hash;
       return hash;
@@ -22274,6 +22322,8 @@ public final class ClientProtos {
         bitField0_ = (bitField0_ & ~0x00000008);
         bulkToken_ = "";
         bitField0_ = (bitField0_ & ~0x00000010);
+        copyFile_ = false;
+        bitField0_ = (bitField0_ & ~0x00000020);
         return this;
       }
 
@@ -22335,6 +22385,10 @@ public final class ClientProtos {
           to_bitField0_ |= 0x00000008;
         }
         result.bulkToken_ = bulkToken_;
+        if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+          to_bitField0_ |= 0x00000010;
+        }
+        result.copyFile_ = copyFile_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -22391,6 +22445,9 @@ public final class ClientProtos {
           bulkToken_ = other.bulkToken_;
           onChanged();
         }
+        if (other.hasCopyFile()) {
+          setCopyFile(other.getCopyFile());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -23013,6 +23070,39 @@ public final class ClientProtos {
         return this;
       }
 
+      // optional bool copy_file = 6 [default = false];
+      private boolean copyFile_ ;
+      /**
+       * <code>optional bool copy_file = 6 [default = false];</code>
+       */
+      public boolean hasCopyFile() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      /**
+       * <code>optional bool copy_file = 6 [default = false];</code>
+       */
+      public boolean getCopyFile() {
+        return copyFile_;
+      }
+      /**
+       * <code>optional bool copy_file = 6 [default = false];</code>
+       */
+      public Builder setCopyFile(boolean value) {
+        bitField0_ |= 0x00000020;
+        copyFile_ = value;
+        onChanged();
+        return this;
+      }
+      /**
+       * <code>optional bool copy_file = 6 [default = false];</code>
+       */
+      public Builder clearCopyFile() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        copyFile_ = false;
+        onChanged();
+        return this;
+      }
+
       // @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest)
     }
 
@@ -39320,81 +39410,81 @@ public final class ClientProtos {
       "_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" +
       "ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" +
       "scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" +
-      "s\"\206\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
+      "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
       "(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" +
       "path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" +
       "st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" +
       "\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" +
-      "en\022\022\n\nbulk_token\030\005 \001(\t\032*\n\nFamilyPath\022\016\n\006" +
-      "family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF",
-      "ileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegatio" +
-      "nToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002" +
-      " \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026P" +
-      "repareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(" +
-      "\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031" +
-      ".hbase.pb.RegionSpecifier\"-\n\027PrepareBulk" +
-      "LoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Clea" +
-      "nupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)" +
-      "\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecifie" +
-      "r\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproces",
-      "sorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_n" +
-      "ame\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reques" +
-      "t\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005v" +
-      "alue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031C" +
-      "oprocessorServiceRequest\022)\n\006region\030\001 \002(\013" +
-      "2\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(" +
-      "\0132 .hbase.pb.CoprocessorServiceCall\"o\n\032C" +
-      "oprocessorServiceResponse\022)\n\006region\030\001 \002(" +
-      "\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 " +
-      "\002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022",
-      "\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase" +
-      ".pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.p" +
-      "b.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.C" +
-      "oprocessorServiceCall\"k\n\014RegionAction\022)\n" +
-      "\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifier" +
-      "\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase" +
-      ".pb.Action\"c\n\017RegionLoadStats\022\027\n\014memstor" +
-      "eLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010" +
-      "\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Multi" +
-      "RegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.",
-      "pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase" +
-      ".pb.RegionLoadStats\"\336\001\n\021ResultOrExceptio" +
-      "n\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase" +
-      ".pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb" +
-      ".NameBytesPair\022:\n\016service_result\030\004 \001(\0132\"" +
-      ".hbase.pb.CoprocessorServiceResult\0220\n\tlo" +
-      "adStats\030\005 \001(\0132\031.hbase.pb.RegionLoadStats" +
-      "B\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOrEx" +
-      "ception\030\001 \003(\0132\033.hbase.pb.ResultOrExcepti" +
-      "on\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameByt",
-      "esPair\"x\n\014MultiRequest\022,\n\014regionAction\030\001" +
-      " \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGro" +
-      "up\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.C" +
-      "ondition\"\226\001\n\rMultiResponse\0228\n\022regionActi" +
-      "onResult\030\001 \003(\0132\034.hbase.pb.RegionActionRe" +
-      "sult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatist" +
-      "ics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStat" +
-      "s*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE" +
-      "\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.G" +
-      "etRequest\032\025.hbase.pb.GetResponse\022;\n\006Muta",
-      "te\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.M" +
-      "utateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReq" +
-      "uest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoad" +
-      "HFile\022\036.hbase.pb.BulkLoadHFileRequest\032\037." +
-      "hbase.pb.BulkLoadHFileResponse\022V\n\017Prepar" +
-      "eBulkLoad\022 .hbase.pb.PrepareBulkLoadRequ" +
-      "est\032!.hbase.pb.PrepareBulkLoadResponse\022V" +
-      "\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBulk" +
-      "LoadRequest\032!.hbase.pb.CleanupBulkLoadRe" +
-      "sponse\022X\n\013ExecService\022#.hbase.pb.Coproce",
-      "ssorServiceRequest\032$.hbase.pb.Coprocesso" +
-      "rServiceResponse\022d\n\027ExecRegionServerServ" +
-      "ice\022#.hbase.pb.CoprocessorServiceRequest" +
-      "\032$.hbase.pb.CoprocessorServiceResponse\0228" +
-      "\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbase." +
-      "pb.MultiResponseBB\n*org.apache.hadoop.hb" +
-      "ase.protobuf.generatedB\014ClientProtosH\001\210\001" +
-      "\001\240\001\001"
+      "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(" +
+      "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014",
+      "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" +
+      "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" +
+      "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" +
+      " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" +
+      "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" +
+      "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" +
+      "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" +
+      "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" +
+      "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013" +
+      "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu",
+      "lkLoadResponse\"a\n\026CoprocessorServiceCall" +
+      "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" +
+      "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" +
+      "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" +
+      "base.pb.NameBytesPair\"v\n\031CoprocessorServ" +
+      "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" +
+      "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" +
+      "oprocessorServiceCall\"o\n\032CoprocessorServ" +
+      "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" +
+      "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb",
+      ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" +
+      "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" +
+      "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" +
+      "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" +
+      "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" +
+      "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" +
+      "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" +
+      "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" +
+      "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction" +
+      "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat",
+      "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" +
+      "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" +
+      "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" +
+      "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" +
+      "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" +
+      "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" +
+      "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" +
+      "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" +
+      "ActionResult\0226\n\021resultOrException\030\001 \003(\0132" +
+      "\033.hbase.pb.ResultOrException\022*\n\texceptio",
+      "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" +
+      "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" +
+      "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" +
+      "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" +
+      "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" +
+      "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" +
+      "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" +
+      "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" +
+      "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS" +
+      "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb",
+      "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." +
+      "MutateRequest\032\030.hbase.pb.MutateResponse\022" +
+      "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" +
+      "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." +
+      "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" +
+      "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" +
+      "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" +
+      ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" +
+      "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!." +
+      "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec",
+      "Service\022#.hbase.pb.CoprocessorServiceReq" +
+      "uest\032$.hbase.pb.CoprocessorServiceRespon" +
+      "se\022d\n\027ExecRegionServerService\022#.hbase.pb" +
+      ".CoprocessorServiceRequest\032$.hbase.pb.Co" +
+      "processorServiceResponse\0228\n\005Multi\022\026.hbas" +
+      "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" +
+      "seBB\n*org.apache.hadoop.hbase.protobuf.g" +
+      "eneratedB\014ClientProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -39502,7 +39592,7 @@ public final class ClientProtos {
           internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_hbase_pb_BulkLoadHFileRequest_descriptor,
-              new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", });
+              new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "CopyFile", });
           internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor =
             internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0);
           internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index adb66f7..6c0c00c 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -339,6 +339,7 @@ message BulkLoadHFileRequest {
   optional bool assign_seq_num = 3;
   optional DelegationToken fs_token = 4;
   optional string bulk_token = 5;
+  optional bool copy_file = 6 [default = false];
 
   message FamilyPath {
     required bytes family = 1;

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index e3542ef8..c831efd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -115,6 +115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
   public final static String CREATE_TABLE_CONF_KEY = "create.table";
   public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
+  public final static String ALWAYS_COPY_FILES = "always.copy.files";
 
   // We use a '.' prefix which is ignored when walking directory trees
   // above. It is invalid family name.
@@ -328,7 +329,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    */
   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
       RegionLocator regionLocator) throws TableNotFoundException, IOException {
-    doBulkLoad(hfofDir, admin, table, regionLocator, false);
+    doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
   }
 
   void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
@@ -360,10 +361,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * @param table the table to load into
    * @param regionLocator region locator
    * @param silence true to ignore unmatched column families
+   * @param copyFile always copy hfiles if true
    * @throws TableNotFoundException if table does not yet exist
    */
   public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
-          RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
+          RegionLocator regionLocator, boolean silence, boolean copyFile)
+              throws TableNotFoundException, IOException {
     if (!admin.isTableAvailable(regionLocator.getName())) {
       throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
     }
@@ -386,7 +389,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           break;
         }
       }
-      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
     } finally {
       cleanup(admin, queue, pool, secureClient);
     }
@@ -402,10 +405,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * @param table the table to load into
    * @param regionLocator region locator
    * @param silence true to ignore unmatched column families
+   * @param copyFile always copy hfiles if true
    * @throws TableNotFoundException if table does not yet exist
    */
   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
-      RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
+      RegionLocator regionLocator, boolean silence, boolean copyFile)
+          throws TableNotFoundException, IOException {
     if (!admin.isTableAvailable(regionLocator.getName())) {
       throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
     }
@@ -437,7 +442,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       }
       pool = createExecutorService();
       secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
-      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+      performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
     } finally {
       cleanup(admin, queue, pool, secureClient);
     }
@@ -445,7 +450,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
       Deque<LoadQueueItem> queue, ExecutorService pool,
-      SecureBulkLoadClient secureClient) throws IOException {
+      SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
     int count = 0;
 
     if(isSecureBulkLoadEndpointAvailable()) {
@@ -486,7 +491,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
             + " hfiles to one family of one region");
       }
 
-      bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
+      bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile);
 
       // NOTE: The next iteration's split / group could happen in parallel to
       // atomic bulkloads assuming that there are splits and no merges, and
@@ -599,12 +604,29 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    */
   public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
       Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+    loadHFileQueue(table, conn, queue, startEndKeys, false);
+  }
+
+  /**
+   * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+   * <ol>
+   * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+   * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+   * </li>
+   * </ol>
+   * @param table Table to which these hfiles should be loaded to
+   * @param conn Connection to use
+   * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+   * @param startEndKeys starting and ending row keys of the region
+   */
+  public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
+      Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
     ExecutorService pool = null;
     try {
       pool = createExecutorService();
       Multimap<ByteBuffer, LoadQueueItem> regionGroups =
           groupOrSplitPhase(table, pool, queue, startEndKeys);
-      bulkLoadPhase(table, conn, pool, queue, regionGroups);
+      bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile);
     } finally {
       if (pool != null) {
         pool.shutdown();
@@ -619,7 +641,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    */
   protected void bulkLoadPhase(final Table table, final Connection conn,
       ExecutorService pool, Deque<LoadQueueItem> queue,
-      final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+      final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile) throws IOException {
     // atomically bulk load the groups.
     Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
     for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){
@@ -630,7 +652,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
         @Override
         public List<LoadQueueItem> call() throws Exception {
           List<LoadQueueItem> toRetry =
-              tryAtomicRegionLoad(conn, table.getName(), first, lqis);
+              tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile);
           return toRetry;
         }
       };
@@ -890,8 +912,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    *   failure
    */
   protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
-      final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
-  throws IOException {
+      final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis,
+      boolean copyFile) throws IOException {
     final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
     for (LoadQueueItem lqi : lqis) {
       if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
@@ -911,7 +933,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
           try (Table table = conn.getTable(getTableName())) {
             secureClient = new SecureBulkLoadClient(getConf(), table);
             success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
-                  assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
+                  assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
           }
           return success;
         } finally {
@@ -1172,10 +1194,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       try (Table table = connection.getTable(tableName);
         RegionLocator locator = connection.getRegionLocator(tableName)) {
         boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
+        boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
         if (dirPath != null) {
-          doBulkLoad(hfofDir, admin, table, locator, silence);
+          doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
         } else {
-          doBulkLoad(map, admin, table, locator, silence);
+          doBulkLoad(map, admin, table, locator, silence, copyFiles);
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index debaec9..f6d2e36 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5422,6 +5422,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
       BulkLoadListener bulkLoadListener) throws IOException {
+    return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
+  }
+
+  @Override
+  public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
+      BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
     long seqId = -1;
     Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
     Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
@@ -5503,7 +5509,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         try {
           String finalPath = path;
           if (bulkLoadListener != null) {
-            finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
+            finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
           }
           Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5ba8afd..541680c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2078,7 +2078,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
         }
         if (!bypass) {
-          loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
+          loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
+              request.getCopyFile());
         }
         if (region.getCoprocessorHost() != null) {
           loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index efd68b8..4e76e76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -512,7 +512,6 @@ public interface Region extends ConfigurationObserver {
    * pre/post processing of a given bulkload call
    */
   interface BulkLoadListener {
-
     /**
      * Called before an HFile is actually loaded
      * @param family family being loaded to
@@ -520,7 +519,8 @@ public interface Region extends ConfigurationObserver {
      * @return final path to be used for actual loading
      * @throws IOException
      */
-    String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
+    String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
+        throws IOException;
 
     /**
      * Called after a successful HFile load
@@ -553,6 +553,21 @@ public interface Region extends ConfigurationObserver {
   boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
       BulkLoadListener bulkLoadListener) throws IOException;
 
+  /**
+   * Attempts to atomically load a group of hfiles.  This is critical for loading
+   * rows with multiple column families atomically.
+   *
+   * @param familyPaths List of Pair&lt;byte[] column family, String hfilePath&gt;
+   * @param assignSeqId
+   * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
+   * file about to be bulk loaded
+   * @param copyFile always copy hfiles if true
+   * @return true if successful, false if failed recoverably
+   * @throws IOException if failed unrecoverably.
+   */
+  boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
+      BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException;
+
   ///////////////////////////////////////////////////////////////////////////
   // Coprocessors
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 0d64e4e..88c993e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -233,7 +233,7 @@ public class SecureBulkLoadManager {
             //We call bulkLoadHFiles as requesting user
             //To enable access prior to staging
             return region.bulkLoadHFiles(familyPaths, true,
-                new SecureBulkLoadListener(fs, bulkToken, conf));
+                new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
           } catch (Exception e) {
             LOG.error("Failed to complete bulk load", e);
           }
@@ -305,7 +305,8 @@ public class SecureBulkLoadManager {
     }
 
     @Override
-    public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
+    public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile)
+        throws IOException {
       Path p = new Path(srcPath);
       Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
 
@@ -329,6 +330,9 @@ public class SecureBulkLoadManager {
         LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
             "the destination filesystem. Copying file over to destination staging dir.");
         FileUtil.copy(srcFs, p, fs, stageP, false, conf);
+      } else if (copyFile) {
+        LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
+        FileUtil.copy(srcFs, p, fs, stageP, false, conf);
       } else {
         LOG.debug("Moving " + p + " to " + stageP);
         FileStatus origFileStatus = fs.getFileStatus(p);

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 0b96720..88b9247 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -131,6 +131,17 @@ public class TestLoadIncrementalHFiles {
     });
   }
 
+  @Test(timeout = 120000)
+  public void testSimpleLoadWithFileCopy() throws Exception {
+    String testName = "mytable_testSimpleLoadWithFileCopy";
+    final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
+    runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
+        false, null, new byte[][][] {
+          new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+          new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+    }, false, true);
+  }
+
   /**
    * Test case that creates some regions and loads
    * HFiles that cross the boundaries of those regions
@@ -291,12 +302,12 @@ public class TestLoadIncrementalHFiles {
       boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
           throws Exception {
     HTableDescriptor htd = buildHTD(tableName, bloomType);
-    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap);
+    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
   }
 
   private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
-          throws Exception {
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
+      boolean copyFiles) throws Exception {
     Path dir = util.getDataTestDirOnTestFS(testName);
     FileSystem fs = util.getTestFileSystem();
     dir = dir.makeQualified(fs);
@@ -305,9 +316,11 @@ public class TestLoadIncrementalHFiles {
     int hfileIdx = 0;
     Map<byte[], List<Path>> map = null;
     List<Path> list = null;
+    if (useMap || copyFiles) {
+      list = new ArrayList<>();
+    }
     if (useMap) {
       map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
-      list = new ArrayList<>();
       map.put(FAMILY, list);
     }
     for (byte[][] range : hfileRanges) {
@@ -326,7 +339,11 @@ public class TestLoadIncrementalHFiles {
     }
 
     final TableName tableName = htd.getTableName();
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+    Configuration conf = util.getConfiguration();
+    if (copyFiles) {
+      conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+    }
+    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
     String [] args= {dir.toString(), tableName.toString()};
     if (useMap) {
       loader.run(null, map, tableName);
@@ -334,6 +351,12 @@ public class TestLoadIncrementalHFiles {
       loader.run(args);
     }
 
+    if (copyFiles) {
+      for (Path p : list) {
+        assertTrue(fs.exists(p));
+      }
+    }
+
     Table table = util.getConnection().getTable(tableName);
     try {
       assertEquals(expectedRows, util.countRows(table));
@@ -419,7 +442,7 @@ public class TestLoadIncrementalHFiles {
     htd.addFamily(family);
 
     try {
-      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false);
+      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
       assertTrue("Loading into table with non-existent family should have failed", false);
     } catch (Exception e) {
       assertTrue("IOException expected", e instanceof IOException);

http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 66d7eb1..90a1409 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -281,8 +281,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
       LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
         @Override
         protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
-            TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
-                throws IOException {
+            TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis,
+            boolean copyFile) throws IOException {
           int i = attmptedCalls.incrementAndGet();
           if (i == 1) {
             Connection errConn;
@@ -293,10 +293,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
               throw new RuntimeException("mocking cruft, should never happen");
             }
             failedCalls.incrementAndGet();
-            return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
+            return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, copyFile);
           }
 
-          return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
+          return super.tryAtomicRegionLoad(conn, tableName, first, lqis, copyFile);
         }
       };
       try {
@@ -359,13 +359,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
         @Override
         protected void bulkLoadPhase(final Table htable, final Connection conn,
             ExecutorService pool, Deque<LoadQueueItem> queue,
-            final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+            final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile)
+                throws IOException {
           int i = attemptedCalls.incrementAndGet();
           if (i == 1) {
             // On first attempt force a split.
             forceSplit(table);
           }
-          super.bulkLoadPhase(htable, conn, pool, queue, regionGroups);
+          super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile);
         }
       };