You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/27 01:56:44 UTC
hbase git commit: HBASE-16672 Add option for bulk load to copy
hfile(s) instead of renaming
Repository: hbase
Updated Branches:
refs/heads/master b9ec59ebb -> 219c78645
HBASE-16672 Add option for bulk load to copy hfile(s) instead of renaming
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/219c7864
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/219c7864
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/219c7864
Branch: refs/heads/master
Commit: 219c7864575be3ab0a1f67ddd48c5852377c2ed2
Parents: b9ec59e
Author: tedyu <yu...@gmail.com>
Authored: Mon Sep 26 18:56:38 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Mon Sep 26 18:56:38 2016 -0700
----------------------------------------------------------------------
.../hbase/client/SecureBulkLoadClient.java | 23 +-
.../hadoop/hbase/protobuf/RequestConverter.java | 22 ++
.../hbase/protobuf/generated/ClientProtos.java | 234 +++++++++++++------
hbase-protocol/src/main/protobuf/Client.proto | 1 +
.../hbase/mapreduce/LoadIncrementalHFiles.java | 53 +++--
.../hadoop/hbase/regionserver/HRegion.java | 8 +-
.../hbase/regionserver/RSRpcServices.java | 3 +-
.../hadoop/hbase/regionserver/Region.java | 19 +-
.../regionserver/SecureBulkLoadManager.java | 8 +-
.../mapreduce/TestLoadIncrementalHFiles.java | 35 ++-
.../TestLoadIncrementalHFilesSplitRecovery.java | 13 +-
11 files changed, 313 insertions(+), 106 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
index eddf8f1..5af8034 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
@@ -113,9 +113,30 @@ public class SecureBulkLoadClient {
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) throws IOException {
+ return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
+ false);
+ }
+
+ /**
+ * Securely bulk load a list of HFiles using client protocol.
+ *
+ * @param client
+ * @param familyPaths
+ * @param regionName
+ * @param assignSeqNum
+ * @param userToken
+ * @param bulkToken
+ * @param copyFiles
+ * @return true if all are loaded
+ * @throws IOException
+ */
+ public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
+ final List<Pair<byte[], String>> familyPaths,
+ final byte[] regionName, boolean assignSeqNum,
+ final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
BulkLoadHFileRequest request =
RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
- userToken, bulkToken);
+ userToken, bulkToken, copyFiles);
try {
BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
index b75d2b8..860e0e4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
@@ -522,12 +522,33 @@ public final class RequestConverter {
* @param familyPaths
* @param regionName
* @param assignSeqNum
+ * @param userToken
+ * @param bulkToken
* @return a bulk load request
*/
public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
final List<Pair<byte[], String>> familyPaths,
final byte[] regionName, boolean assignSeqNum,
final Token<?> userToken, final String bulkToken) {
+ return buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum, userToken, bulkToken,
+ false);
+ }
+
+ /**
+ * Create a protocol buffer bulk load request
+ *
+ * @param familyPaths
+ * @param regionName
+ * @param assignSeqNum
+ * @param userToken
+ * @param bulkToken
+ * @param copyFiles
+ * @return a bulk load request
+ */
+ public static BulkLoadHFileRequest buildBulkLoadHFileRequest(
+ final List<Pair<byte[], String>> familyPaths,
+ final byte[] regionName, boolean assignSeqNum,
+ final Token<?> userToken, final String bulkToken, boolean copyFiles) {
RegionSpecifier region = RequestConverter.buildRegionSpecifier(
RegionSpecifierType.REGION_NAME, regionName);
@@ -560,6 +581,7 @@ public final class RequestConverter {
if (bulkToken != null) {
request.setBulkToken(bulkToken);
}
+ request.setCopyFile(copyFiles);
return request.build();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
index 4e9e0b4..f0141df 100644
--- a/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
+++ b/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/ClientProtos.java
@@ -21077,6 +21077,16 @@ public final class ClientProtos {
*/
com.google.protobuf.ByteString
getBulkTokenBytes();
+
+ // optional bool copy_file = 6 [default = false];
+ /**
+ * <code>optional bool copy_file = 6 [default = false];</code>
+ */
+ boolean hasCopyFile();
+ /**
+ * <code>optional bool copy_file = 6 [default = false];</code>
+ */
+ boolean getCopyFile();
}
/**
* Protobuf type {@code hbase.pb.BulkLoadHFileRequest}
@@ -21179,6 +21189,11 @@ public final class ClientProtos {
bulkToken_ = input.readBytes();
break;
}
+ case 48: {
+ bitField0_ |= 0x00000010;
+ copyFile_ = input.readBool();
+ break;
+ }
}
}
} catch (com.google.protobuf.InvalidProtocolBufferException e) {
@@ -21979,12 +21994,29 @@ public final class ClientProtos {
}
}
+ // optional bool copy_file = 6 [default = false];
+ public static final int COPY_FILE_FIELD_NUMBER = 6;
+ private boolean copyFile_;
+ /**
+ * <code>optional bool copy_file = 6 [default = false];</code>
+ */
+ public boolean hasCopyFile() {
+ return ((bitField0_ & 0x00000010) == 0x00000010);
+ }
+ /**
+ * <code>optional bool copy_file = 6 [default = false];</code>
+ */
+ public boolean getCopyFile() {
+ return copyFile_;
+ }
+
private void initFields() {
region_ = org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.getDefaultInstance();
familyPath_ = java.util.Collections.emptyList();
assignSeqNum_ = false;
fsToken_ = org.apache.hadoop.hbase.protobuf.generated.ClientProtos.DelegationToken.getDefaultInstance();
bulkToken_ = "";
+ copyFile_ = false;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -22027,6 +22059,9 @@ public final class ClientProtos {
if (((bitField0_ & 0x00000008) == 0x00000008)) {
output.writeBytes(5, getBulkTokenBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ output.writeBool(6, copyFile_);
+ }
getUnknownFields().writeTo(output);
}
@@ -22056,6 +22091,10 @@ public final class ClientProtos {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(5, getBulkTokenBytes());
}
+ if (((bitField0_ & 0x00000010) == 0x00000010)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBoolSize(6, copyFile_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -22101,6 +22140,11 @@ public final class ClientProtos {
result = result && getBulkToken()
.equals(other.getBulkToken());
}
+ result = result && (hasCopyFile() == other.hasCopyFile());
+ if (hasCopyFile()) {
+ result = result && (getCopyFile()
+ == other.getCopyFile());
+ }
result = result &&
getUnknownFields().equals(other.getUnknownFields());
return result;
@@ -22134,6 +22178,10 @@ public final class ClientProtos {
hash = (37 * hash) + BULK_TOKEN_FIELD_NUMBER;
hash = (53 * hash) + getBulkToken().hashCode();
}
+ if (hasCopyFile()) {
+ hash = (37 * hash) + COPY_FILE_FIELD_NUMBER;
+ hash = (53 * hash) + hashBoolean(getCopyFile());
+ }
hash = (29 * hash) + getUnknownFields().hashCode();
memoizedHashCode = hash;
return hash;
@@ -22274,6 +22322,8 @@ public final class ClientProtos {
bitField0_ = (bitField0_ & ~0x00000008);
bulkToken_ = "";
bitField0_ = (bitField0_ & ~0x00000010);
+ copyFile_ = false;
+ bitField0_ = (bitField0_ & ~0x00000020);
return this;
}
@@ -22335,6 +22385,10 @@ public final class ClientProtos {
to_bitField0_ |= 0x00000008;
}
result.bulkToken_ = bulkToken_;
+ if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
+ to_bitField0_ |= 0x00000010;
+ }
+ result.copyFile_ = copyFile_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -22391,6 +22445,9 @@ public final class ClientProtos {
bulkToken_ = other.bulkToken_;
onChanged();
}
+ if (other.hasCopyFile()) {
+ setCopyFile(other.getCopyFile());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -23013,6 +23070,39 @@ public final class ClientProtos {
return this;
}
+ // optional bool copy_file = 6 [default = false];
+ private boolean copyFile_ ;
+ /**
+ * <code>optional bool copy_file = 6 [default = false];</code>
+ */
+ public boolean hasCopyFile() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ /**
+ * <code>optional bool copy_file = 6 [default = false];</code>
+ */
+ public boolean getCopyFile() {
+ return copyFile_;
+ }
+ /**
+ * <code>optional bool copy_file = 6 [default = false];</code>
+ */
+ public Builder setCopyFile(boolean value) {
+ bitField0_ |= 0x00000020;
+ copyFile_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * <code>optional bool copy_file = 6 [default = false];</code>
+ */
+ public Builder clearCopyFile() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ copyFile_ = false;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:hbase.pb.BulkLoadHFileRequest)
}
@@ -39320,81 +39410,81 @@ public final class ClientProtos {
"_per_result\030\007 \003(\010\022\036\n\026more_results_in_reg" +
"ion\030\010 \001(\010\022\031\n\021heartbeat_message\030\t \001(\010\022+\n\014" +
"scan_metrics\030\n \001(\0132\025.hbase.pb.ScanMetric" +
- "s\"\206\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
+ "s\"\240\002\n\024BulkLoadHFileRequest\022)\n\006region\030\001 \002" +
"(\0132\031.hbase.pb.RegionSpecifier\022>\n\013family_" +
"path\030\002 \003(\0132).hbase.pb.BulkLoadHFileReque" +
"st.FamilyPath\022\026\n\016assign_seq_num\030\003 \001(\010\022+\n" +
"\010fs_token\030\004 \001(\0132\031.hbase.pb.DelegationTok" +
- "en\022\022\n\nbulk_token\030\005 \001(\t\032*\n\nFamilyPath\022\016\n\006" +
- "family\030\001 \002(\014\022\014\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHF",
- "ileResponse\022\016\n\006loaded\030\001 \002(\010\"V\n\017Delegatio" +
- "nToken\022\022\n\nidentifier\030\001 \001(\014\022\020\n\010password\030\002" +
- " \001(\014\022\014\n\004kind\030\003 \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026P" +
- "repareBulkLoadRequest\022\'\n\ntable_name\030\001 \002(" +
- "\0132\023.hbase.pb.TableName\022)\n\006region\030\002 \001(\0132\031" +
- ".hbase.pb.RegionSpecifier\"-\n\027PrepareBulk" +
- "LoadResponse\022\022\n\nbulk_token\030\001 \002(\t\"W\n\026Clea" +
- "nupBulkLoadRequest\022\022\n\nbulk_token\030\001 \002(\t\022)" +
- "\n\006region\030\002 \001(\0132\031.hbase.pb.RegionSpecifie" +
- "r\"\031\n\027CleanupBulkLoadResponse\"a\n\026Coproces",
- "sorServiceCall\022\013\n\003row\030\001 \002(\014\022\024\n\014service_n" +
- "ame\030\002 \002(\t\022\023\n\013method_name\030\003 \002(\t\022\017\n\007reques" +
- "t\030\004 \002(\014\"B\n\030CoprocessorServiceResult\022&\n\005v" +
- "alue\030\001 \001(\0132\027.hbase.pb.NameBytesPair\"v\n\031C" +
- "oprocessorServiceRequest\022)\n\006region\030\001 \002(\013" +
- "2\031.hbase.pb.RegionSpecifier\022.\n\004call\030\002 \002(" +
- "\0132 .hbase.pb.CoprocessorServiceCall\"o\n\032C" +
- "oprocessorServiceResponse\022)\n\006region\030\001 \002(" +
- "\0132\031.hbase.pb.RegionSpecifier\022&\n\005value\030\002 " +
- "\002(\0132\027.hbase.pb.NameBytesPair\"\226\001\n\006Action\022",
- "\r\n\005index\030\001 \001(\r\022)\n\010mutation\030\002 \001(\0132\027.hbase" +
- ".pb.MutationProto\022\032\n\003get\030\003 \001(\0132\r.hbase.p" +
- "b.Get\0226\n\014service_call\030\004 \001(\0132 .hbase.pb.C" +
- "oprocessorServiceCall\"k\n\014RegionAction\022)\n" +
- "\006region\030\001 \002(\0132\031.hbase.pb.RegionSpecifier" +
- "\022\016\n\006atomic\030\002 \001(\010\022 \n\006action\030\003 \003(\0132\020.hbase" +
- ".pb.Action\"c\n\017RegionLoadStats\022\027\n\014memstor" +
- "eLoad\030\001 \001(\005:\0010\022\030\n\rheapOccupancy\030\002 \001(\005:\0010" +
- "\022\035\n\022compactionPressure\030\003 \001(\005:\0010\"j\n\024Multi" +
- "RegionLoadStats\022)\n\006region\030\001 \003(\0132\031.hbase.",
- "pb.RegionSpecifier\022\'\n\004stat\030\002 \003(\0132\031.hbase" +
- ".pb.RegionLoadStats\"\336\001\n\021ResultOrExceptio" +
- "n\022\r\n\005index\030\001 \001(\r\022 \n\006result\030\002 \001(\0132\020.hbase" +
- ".pb.Result\022*\n\texception\030\003 \001(\0132\027.hbase.pb" +
- ".NameBytesPair\022:\n\016service_result\030\004 \001(\0132\"" +
- ".hbase.pb.CoprocessorServiceResult\0220\n\tlo" +
- "adStats\030\005 \001(\0132\031.hbase.pb.RegionLoadStats" +
- "B\002\030\001\"x\n\022RegionActionResult\0226\n\021resultOrEx" +
- "ception\030\001 \003(\0132\033.hbase.pb.ResultOrExcepti" +
- "on\022*\n\texception\030\002 \001(\0132\027.hbase.pb.NameByt",
- "esPair\"x\n\014MultiRequest\022,\n\014regionAction\030\001" +
- " \003(\0132\026.hbase.pb.RegionAction\022\022\n\nnonceGro" +
- "up\030\002 \001(\004\022&\n\tcondition\030\003 \001(\0132\023.hbase.pb.C" +
- "ondition\"\226\001\n\rMultiResponse\0228\n\022regionActi" +
- "onResult\030\001 \003(\0132\034.hbase.pb.RegionActionRe" +
- "sult\022\021\n\tprocessed\030\002 \001(\010\0228\n\020regionStatist" +
- "ics\030\003 \001(\0132\036.hbase.pb.MultiRegionLoadStat" +
- "s*\'\n\013Consistency\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE" +
- "\020\0012\263\005\n\rClientService\0222\n\003Get\022\024.hbase.pb.G" +
- "etRequest\032\025.hbase.pb.GetResponse\022;\n\006Muta",
- "te\022\027.hbase.pb.MutateRequest\032\030.hbase.pb.M" +
- "utateResponse\0225\n\004Scan\022\025.hbase.pb.ScanReq" +
- "uest\032\026.hbase.pb.ScanResponse\022P\n\rBulkLoad" +
- "HFile\022\036.hbase.pb.BulkLoadHFileRequest\032\037." +
- "hbase.pb.BulkLoadHFileResponse\022V\n\017Prepar" +
- "eBulkLoad\022 .hbase.pb.PrepareBulkLoadRequ" +
- "est\032!.hbase.pb.PrepareBulkLoadResponse\022V" +
- "\n\017CleanupBulkLoad\022 .hbase.pb.CleanupBulk" +
- "LoadRequest\032!.hbase.pb.CleanupBulkLoadRe" +
- "sponse\022X\n\013ExecService\022#.hbase.pb.Coproce",
- "ssorServiceRequest\032$.hbase.pb.Coprocesso" +
- "rServiceResponse\022d\n\027ExecRegionServerServ" +
- "ice\022#.hbase.pb.CoprocessorServiceRequest" +
- "\032$.hbase.pb.CoprocessorServiceResponse\0228" +
- "\n\005Multi\022\026.hbase.pb.MultiRequest\032\027.hbase." +
- "pb.MultiResponseBB\n*org.apache.hadoop.hb" +
- "ase.protobuf.generatedB\014ClientProtosH\001\210\001" +
- "\001\240\001\001"
+ "en\022\022\n\nbulk_token\030\005 \001(\t\022\030\n\tcopy_file\030\006 \001(" +
+ "\010:\005false\032*\n\nFamilyPath\022\016\n\006family\030\001 \002(\014\022\014",
+ "\n\004path\030\002 \002(\t\"\'\n\025BulkLoadHFileResponse\022\016\n" +
+ "\006loaded\030\001 \002(\010\"V\n\017DelegationToken\022\022\n\niden" +
+ "tifier\030\001 \001(\014\022\020\n\010password\030\002 \001(\014\022\014\n\004kind\030\003" +
+ " \001(\t\022\017\n\007service\030\004 \001(\t\"l\n\026PrepareBulkLoad" +
+ "Request\022\'\n\ntable_name\030\001 \002(\0132\023.hbase.pb.T" +
+ "ableName\022)\n\006region\030\002 \001(\0132\031.hbase.pb.Regi" +
+ "onSpecifier\"-\n\027PrepareBulkLoadResponse\022\022" +
+ "\n\nbulk_token\030\001 \002(\t\"W\n\026CleanupBulkLoadReq" +
+ "uest\022\022\n\nbulk_token\030\001 \002(\t\022)\n\006region\030\002 \001(\013" +
+ "2\031.hbase.pb.RegionSpecifier\"\031\n\027CleanupBu",
+ "lkLoadResponse\"a\n\026CoprocessorServiceCall" +
+ "\022\013\n\003row\030\001 \002(\014\022\024\n\014service_name\030\002 \002(\t\022\023\n\013m" +
+ "ethod_name\030\003 \002(\t\022\017\n\007request\030\004 \002(\014\"B\n\030Cop" +
+ "rocessorServiceResult\022&\n\005value\030\001 \001(\0132\027.h" +
+ "base.pb.NameBytesPair\"v\n\031CoprocessorServ" +
+ "iceRequest\022)\n\006region\030\001 \002(\0132\031.hbase.pb.Re" +
+ "gionSpecifier\022.\n\004call\030\002 \002(\0132 .hbase.pb.C" +
+ "oprocessorServiceCall\"o\n\032CoprocessorServ" +
+ "iceResponse\022)\n\006region\030\001 \002(\0132\031.hbase.pb.R" +
+ "egionSpecifier\022&\n\005value\030\002 \002(\0132\027.hbase.pb",
+ ".NameBytesPair\"\226\001\n\006Action\022\r\n\005index\030\001 \001(\r" +
+ "\022)\n\010mutation\030\002 \001(\0132\027.hbase.pb.MutationPr" +
+ "oto\022\032\n\003get\030\003 \001(\0132\r.hbase.pb.Get\0226\n\014servi" +
+ "ce_call\030\004 \001(\0132 .hbase.pb.CoprocessorServ" +
+ "iceCall\"k\n\014RegionAction\022)\n\006region\030\001 \002(\0132" +
+ "\031.hbase.pb.RegionSpecifier\022\016\n\006atomic\030\002 \001" +
+ "(\010\022 \n\006action\030\003 \003(\0132\020.hbase.pb.Action\"c\n\017" +
+ "RegionLoadStats\022\027\n\014memstoreLoad\030\001 \001(\005:\0010" +
+ "\022\030\n\rheapOccupancy\030\002 \001(\005:\0010\022\035\n\022compaction" +
+ "Pressure\030\003 \001(\005:\0010\"j\n\024MultiRegionLoadStat",
+ "s\022)\n\006region\030\001 \003(\0132\031.hbase.pb.RegionSpeci" +
+ "fier\022\'\n\004stat\030\002 \003(\0132\031.hbase.pb.RegionLoad" +
+ "Stats\"\336\001\n\021ResultOrException\022\r\n\005index\030\001 \001" +
+ "(\r\022 \n\006result\030\002 \001(\0132\020.hbase.pb.Result\022*\n\t" +
+ "exception\030\003 \001(\0132\027.hbase.pb.NameBytesPair" +
+ "\022:\n\016service_result\030\004 \001(\0132\".hbase.pb.Copr" +
+ "ocessorServiceResult\0220\n\tloadStats\030\005 \001(\0132" +
+ "\031.hbase.pb.RegionLoadStatsB\002\030\001\"x\n\022Region" +
+ "ActionResult\0226\n\021resultOrException\030\001 \003(\0132" +
+ "\033.hbase.pb.ResultOrException\022*\n\texceptio",
+ "n\030\002 \001(\0132\027.hbase.pb.NameBytesPair\"x\n\014Mult" +
+ "iRequest\022,\n\014regionAction\030\001 \003(\0132\026.hbase.p" +
+ "b.RegionAction\022\022\n\nnonceGroup\030\002 \001(\004\022&\n\tco" +
+ "ndition\030\003 \001(\0132\023.hbase.pb.Condition\"\226\001\n\rM" +
+ "ultiResponse\0228\n\022regionActionResult\030\001 \003(\013" +
+ "2\034.hbase.pb.RegionActionResult\022\021\n\tproces" +
+ "sed\030\002 \001(\010\0228\n\020regionStatistics\030\003 \001(\0132\036.hb" +
+ "ase.pb.MultiRegionLoadStats*\'\n\013Consisten" +
+ "cy\022\n\n\006STRONG\020\000\022\014\n\010TIMELINE\020\0012\263\005\n\rClientS" +
+ "ervice\0222\n\003Get\022\024.hbase.pb.GetRequest\032\025.hb",
+ "ase.pb.GetResponse\022;\n\006Mutate\022\027.hbase.pb." +
+ "MutateRequest\032\030.hbase.pb.MutateResponse\022" +
+ "5\n\004Scan\022\025.hbase.pb.ScanRequest\032\026.hbase.p" +
+ "b.ScanResponse\022P\n\rBulkLoadHFile\022\036.hbase." +
+ "pb.BulkLoadHFileRequest\032\037.hbase.pb.BulkL" +
+ "oadHFileResponse\022V\n\017PrepareBulkLoad\022 .hb" +
+ "ase.pb.PrepareBulkLoadRequest\032!.hbase.pb" +
+ ".PrepareBulkLoadResponse\022V\n\017CleanupBulkL" +
+ "oad\022 .hbase.pb.CleanupBulkLoadRequest\032!." +
+ "hbase.pb.CleanupBulkLoadResponse\022X\n\013Exec",
+ "Service\022#.hbase.pb.CoprocessorServiceReq" +
+ "uest\032$.hbase.pb.CoprocessorServiceRespon" +
+ "se\022d\n\027ExecRegionServerService\022#.hbase.pb" +
+ ".CoprocessorServiceRequest\032$.hbase.pb.Co" +
+ "processorServiceResponse\0228\n\005Multi\022\026.hbas" +
+ "e.pb.MultiRequest\032\027.hbase.pb.MultiRespon" +
+ "seBB\n*org.apache.hadoop.hbase.protobuf.g" +
+ "eneratedB\014ClientProtosH\001\210\001\001\240\001\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -39502,7 +39592,7 @@ public final class ClientProtos {
internal_static_hbase_pb_BulkLoadHFileRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor,
- new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", });
+ new java.lang.String[] { "Region", "FamilyPath", "AssignSeqNum", "FsToken", "BulkToken", "CopyFile", });
internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_descriptor =
internal_static_hbase_pb_BulkLoadHFileRequest_descriptor.getNestedTypes().get(0);
internal_static_hbase_pb_BulkLoadHFileRequest_FamilyPath_fieldAccessorTable = new
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-protocol/src/main/protobuf/Client.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol/src/main/protobuf/Client.proto b/hbase-protocol/src/main/protobuf/Client.proto
index adb66f7..6c0c00c 100644
--- a/hbase-protocol/src/main/protobuf/Client.proto
+++ b/hbase-protocol/src/main/protobuf/Client.proto
@@ -339,6 +339,7 @@ message BulkLoadHFileRequest {
optional bool assign_seq_num = 3;
optional DelegationToken fs_token = 4;
optional string bulk_token = 5;
+ optional bool copy_file = 6 [default = false];
message FamilyPath {
required bytes family = 1;
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index e3542ef8..c831efd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -115,6 +115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
public final static String CREATE_TABLE_CONF_KEY = "create.table";
public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
+ public final static String ALWAYS_COPY_FILES = "always.copy.files";
// We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name.
@@ -328,7 +329,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
RegionLocator regionLocator) throws TableNotFoundException, IOException {
- doBulkLoad(hfofDir, admin, table, regionLocator, false);
+ doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
}
void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
@@ -360,10 +361,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param table the table to load into
* @param regionLocator region locator
* @param silence true to ignore unmatched column families
+ * @param copyFile always copy hfiles if true
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Map<byte[], List<Path>> map, final Admin admin, Table table,
- RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
+ RegionLocator regionLocator, boolean silence, boolean copyFile)
+ throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
}
@@ -386,7 +389,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
break;
}
}
- performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+ performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
} finally {
cleanup(admin, queue, pool, secureClient);
}
@@ -402,10 +405,12 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* @param table the table to load into
* @param regionLocator region locator
* @param silence true to ignore unmatched column families
+ * @param copyFile always copy hfiles if true
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
- RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
+ RegionLocator regionLocator, boolean silence, boolean copyFile)
+ throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
}
@@ -437,7 +442,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
pool = createExecutorService();
secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
- performBulkLoad(admin, table, regionLocator, queue, pool, secureClient);
+ performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
} finally {
cleanup(admin, queue, pool, secureClient);
}
@@ -445,7 +450,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
void performBulkLoad(final Admin admin, Table table, RegionLocator regionLocator,
Deque<LoadQueueItem> queue, ExecutorService pool,
- SecureBulkLoadClient secureClient) throws IOException {
+ SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
int count = 0;
if(isSecureBulkLoadEndpointAvailable()) {
@@ -486,7 +491,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ " hfiles to one family of one region");
}
- bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups);
+ bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile);
// NOTE: The next iteration's split / group could happen in parallel to
// atomic bulkloads assuming that there are splits and no merges, and
@@ -599,12 +604,29 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
Pair<byte[][], byte[][]> startEndKeys) throws IOException {
+ loadHFileQueue(table, conn, queue, startEndKeys, false);
+ }
+
+ /**
+ * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+ * <ol>
+ * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
+ * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
+ * </li>
+ * </ol>
+ * @param table Table to which these hfiles should be loaded to
+ * @param conn Connection to use
+ * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
+ * @param startEndKeys starting and ending row keys of the region
+ */
+ public void loadHFileQueue(final Table table, final Connection conn, Deque<LoadQueueItem> queue,
+ Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
ExecutorService pool = null;
try {
pool = createExecutorService();
Multimap<ByteBuffer, LoadQueueItem> regionGroups =
groupOrSplitPhase(table, pool, queue, startEndKeys);
- bulkLoadPhase(table, conn, pool, queue, regionGroups);
+ bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile);
} finally {
if (pool != null) {
pool.shutdown();
@@ -619,7 +641,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
protected void bulkLoadPhase(final Table table, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue,
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile) throws IOException {
// atomically bulk load the groups.
Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e: regionGroups.asMap().entrySet()){
@@ -630,7 +652,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
@Override
public List<LoadQueueItem> call() throws Exception {
List<LoadQueueItem> toRetry =
- tryAtomicRegionLoad(conn, table.getName(), first, lqis);
+ tryAtomicRegionLoad(conn, table.getName(), first, lqis, copyFile);
return toRetry;
}
};
@@ -890,8 +912,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* failure
*/
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
- final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
- throws IOException {
+ final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis,
+ boolean copyFile) throws IOException {
final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
for (LoadQueueItem lqi : lqis) {
if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
@@ -911,7 +933,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = conn.getTable(getTableName())) {
secureClient = new SecureBulkLoadClient(getConf(), table);
success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- assignSeqIds, fsDelegationToken.getUserToken(), bulkToken);
+ assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
}
return success;
} finally {
@@ -1172,10 +1194,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
+ boolean copyFiles = "yes".equalsIgnoreCase(getConf().get(ALWAYS_COPY_FILES, ""));
if (dirPath != null) {
- doBulkLoad(hfofDir, admin, table, locator, silence);
+ doBulkLoad(hfofDir, admin, table, locator, silence, copyFiles);
} else {
- doBulkLoad(map, admin, table, locator, silence);
+ doBulkLoad(map, admin, table, locator, silence, copyFiles);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index debaec9..f6d2e36 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -5422,6 +5422,12 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException {
+ return bulkLoadHFiles(familyPaths, assignSeqId, bulkLoadListener, false);
+ }
+
+ @Override
+ public boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
+ BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException {
long seqId = -1;
Map<byte[], List<Path>> storeFiles = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
Map<String, Long> storeFilesSizes = new HashMap<String, Long>();
@@ -5503,7 +5509,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
String finalPath = path;
if (bulkLoadListener != null) {
- finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
+ finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
}
Path commitedStoreFile = store.bulkLoadHFile(finalPath, seqId);
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 5ba8afd..541680c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -2078,7 +2078,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
}
if (!bypass) {
- loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null);
+ loaded = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
+ request.getCopyFile());
}
if (region.getCoprocessorHost() != null) {
loaded = region.getCoprocessorHost().postBulkLoadHFile(familyPaths, loaded);
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
index efd68b8..4e76e76 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java
@@ -512,7 +512,6 @@ public interface Region extends ConfigurationObserver {
* pre/post processing of a given bulkload call
*/
interface BulkLoadListener {
-
/**
* Called before an HFile is actually loaded
* @param family family being loaded to
@@ -520,7 +519,8 @@ public interface Region extends ConfigurationObserver {
* @return final path to be used for actual loading
* @throws IOException
*/
- String prepareBulkLoad(byte[] family, String srcPath) throws IOException;
+ String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
+ throws IOException;
/**
* Called after a successful HFile load
@@ -553,6 +553,21 @@ public interface Region extends ConfigurationObserver {
boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
BulkLoadListener bulkLoadListener) throws IOException;
+ /**
+ * Attempts to atomically load a group of hfiles. This is critical for loading
+ * rows with multiple column families atomically.
+ *
+ * @param familyPaths List of Pair<byte[] column family, String hfilePath>
+ * @param assignSeqId
+ * @param bulkLoadListener Internal hooks enabling massaging/preparation of a
+ * file about to be bulk loaded
+ * @param copyFile always copy hfiles if true
+ * @return true if successful, false if failed recoverably
+ * @throws IOException if failed unrecoverably.
+ */
+ boolean bulkLoadHFiles(Collection<Pair<byte[], String>> familyPaths, boolean assignSeqId,
+ BulkLoadListener bulkLoadListener, boolean copyFile) throws IOException;
+
///////////////////////////////////////////////////////////////////////////
// Coprocessors
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
index 0d64e4e..88c993e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SecureBulkLoadManager.java
@@ -233,7 +233,7 @@ public class SecureBulkLoadManager {
//We call bulkLoadHFiles as requesting user
//To enable access prior to staging
return region.bulkLoadHFiles(familyPaths, true,
- new SecureBulkLoadListener(fs, bulkToken, conf));
+ new SecureBulkLoadListener(fs, bulkToken, conf), request.getCopyFile());
} catch (Exception e) {
LOG.error("Failed to complete bulk load", e);
}
@@ -305,7 +305,8 @@ public class SecureBulkLoadManager {
}
@Override
- public String prepareBulkLoad(final byte[] family, final String srcPath) throws IOException {
+ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile)
+ throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));
@@ -329,6 +330,9 @@ public class SecureBulkLoadManager {
LOG.debug("Bulk-load file " + srcPath + " is on different filesystem than " +
"the destination filesystem. Copying file over to destination staging dir.");
FileUtil.copy(srcFs, p, fs, stageP, false, conf);
+ } else if (copyFile) {
+ LOG.debug("Bulk-load file " + srcPath + " is copied to destination staging dir.");
+ FileUtil.copy(srcFs, p, fs, stageP, false, conf);
} else {
LOG.debug("Moving " + p + " to " + stageP);
FileStatus origFileStatus = fs.getFileStatus(p);
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index 0b96720..88b9247 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -131,6 +131,17 @@ public class TestLoadIncrementalHFiles {
});
}
+ @Test(timeout = 120000)
+ public void testSimpleLoadWithFileCopy() throws Exception {
+ String testName = "mytable_testSimpleLoadWithFileCopy";
+ final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
+ runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), BloomType.NONE,
+ false, null, new byte[][][] {
+ new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
+ }, false, true);
+ }
+
/**
* Test case that creates some regions and loads
* HFiles that cross the boundaries of those regions
@@ -291,12 +302,12 @@ public class TestLoadIncrementalHFiles {
boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
throws Exception {
HTableDescriptor htd = buildHTD(tableName, bloomType);
- runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap);
+ runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap, false);
}
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
- boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap)
- throws Exception {
+ boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
+ boolean copyFiles) throws Exception {
Path dir = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
dir = dir.makeQualified(fs);
@@ -305,9 +316,11 @@ public class TestLoadIncrementalHFiles {
int hfileIdx = 0;
Map<byte[], List<Path>> map = null;
List<Path> list = null;
+ if (useMap || copyFiles) {
+ list = new ArrayList<>();
+ }
if (useMap) {
map = new TreeMap<byte[], List<Path>>(Bytes.BYTES_COMPARATOR);
- list = new ArrayList<>();
map.put(FAMILY, list);
}
for (byte[][] range : hfileRanges) {
@@ -326,7 +339,11 @@ public class TestLoadIncrementalHFiles {
}
final TableName tableName = htd.getTableName();
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
+ Configuration conf = util.getConfiguration();
+ if (copyFiles) {
+ conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+ }
+ LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
String [] args= {dir.toString(), tableName.toString()};
if (useMap) {
loader.run(null, map, tableName);
@@ -334,6 +351,12 @@ public class TestLoadIncrementalHFiles {
loader.run(args);
}
+ if (copyFiles) {
+ for (Path p : list) {
+ assertTrue(fs.exists(p));
+ }
+ }
+
Table table = util.getConnection().getTable(tableName);
try {
assertEquals(expectedRows, util.countRows(table));
@@ -419,7 +442,7 @@ public class TestLoadIncrementalHFiles {
htd.addFamily(family);
try {
- runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false);
+ runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, false, false);
assertTrue("Loading into table with non-existent family should have failed", false);
} catch (Exception e) {
assertTrue("IOException expected", e instanceof IOException);
http://git-wip-us.apache.org/repos/asf/hbase/blob/219c7864/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
index 66d7eb1..90a1409 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFilesSplitRecovery.java
@@ -281,8 +281,8 @@ public class TestLoadIncrementalHFilesSplitRecovery {
LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
@Override
protected List<LoadQueueItem> tryAtomicRegionLoad(final Connection conn,
- TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis)
- throws IOException {
+ TableName tableName, final byte[] first, Collection<LoadQueueItem> lqis,
+ boolean copyFile) throws IOException {
int i = attmptedCalls.incrementAndGet();
if (i == 1) {
Connection errConn;
@@ -293,10 +293,10 @@ public class TestLoadIncrementalHFilesSplitRecovery {
throw new RuntimeException("mocking cruft, should never happen");
}
failedCalls.incrementAndGet();
- return super.tryAtomicRegionLoad(errConn, tableName, first, lqis);
+ return super.tryAtomicRegionLoad(errConn, tableName, first, lqis, copyFile);
}
- return super.tryAtomicRegionLoad(conn, tableName, first, lqis);
+ return super.tryAtomicRegionLoad(conn, tableName, first, lqis, copyFile);
}
};
try {
@@ -359,13 +359,14 @@ public class TestLoadIncrementalHFilesSplitRecovery {
@Override
protected void bulkLoadPhase(final Table htable, final Connection conn,
ExecutorService pool, Deque<LoadQueueItem> queue,
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups) throws IOException {
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile)
+ throws IOException {
int i = attemptedCalls.incrementAndGet();
if (i == 1) {
// On first attempt force a split.
forceSplit(table);
}
- super.bulkLoadPhase(htable, conn, pool, queue, regionGroups);
+ super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile);
}
};