You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/02 09:05:22 UTC
[hbase] 10/25: HBASE-21779 Reimplement BulkLoadHFilesTool to use
AsyncClusterConnection
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 4a4e47211eb1975d007dd9b7147e8c2357c75cc2
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Feb 2 17:36:41 2019 +0800
HBASE-21779 Reimplement BulkLoadHFilesTool to use AsyncClusterConnection
---
.../backup/mapreduce/MapReduceRestoreJob.java | 18 +-
.../hadoop/hbase/backup/util/BackupUtils.java | 17 +-
.../hadoop/hbase/backup/util/RestoreTool.java | 9 +-
.../backup/TestIncrementalBackupWithBulkLoad.java | 6 +-
.../hadoop/hbase/client/ConnectionUtils.java | 41 +
.../hadoop/hbase/client/RawAsyncTableImpl.java | 46 +-
.../hadoop/hbase/client/SecureBulkLoadClient.java | 150 ---
...estReplicationSyncUpToolWithBulkLoadedData.java | 6 +-
.../hbase/mapreduce/IntegrationTestBulkLoad.java | 26 +-
.../hbase/mapreduce/IntegrationTestImportTsv.java | 9 +-
.../apache/hadoop/hbase/mapreduce/CopyTable.java | 27 +-
.../hadoop/hbase/mapreduce/HRegionPartitioner.java | 2 +-
.../hbase/mapreduce/TestHFileOutputFormat2.java | 24 +-
.../hbase/client/AsyncClusterConnection.java | 19 +
.../hbase/client/AsyncClusterConnectionImpl.java | 55 +
.../hbase/client/ClusterConnectionFactory.java | 16 +-
.../hbase/mapreduce/LoadIncrementalHFiles.java | 77 --
.../mob/compactions/PartitionedMobCompactor.java | 19 +-
.../replication/regionserver/HFileReplicator.java | 125 +-
.../replication/regionserver/ReplicationSink.java | 43 +-
.../regionserver/WALEntrySinkFilter.java | 8 +-
.../hadoop/hbase/tool/BulkLoadHFilesTool.java | 996 ++++++++++++++-
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 1282 --------------------
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 11 +-
.../hbase/client/DummyAsyncClusterConnection.java | 155 +++
.../hadoop/hbase/client/DummyAsyncRegistry.java | 60 +
.../hadoop/hbase/client/DummyAsyncTable.java | 159 +++
.../hbase/client/TestReplicaWithCluster.java | 40 +-
.../coprocessor/TestRegionObserverInterface.java | 4 +-
.../hbase/quotas/SpaceQuotaHelperForTests.java | 45 +-
.../hbase/quotas/TestLowLatencySpaceQuotas.java | 17 +-
.../hadoop/hbase/quotas/TestSpaceQuotas.java | 30 +-
.../regionserver/TestHRegionServerBulkLoad.java | 69 +-
.../regionserver/TestScannerWithBulkload.java | 19 +-
.../regionserver/TestSecureBulkLoadManager.java | 27 +-
.../hbase/replication/TestMasterReplication.java | 6 +-
.../regionserver/TestReplicationSink.java | 6 +-
.../regionserver/TestWALEntrySinkFilter.java | 429 +------
.../security/access/TestAccessController.java | 12 +-
...rementalHFiles.java => TestBulkLoadHFiles.java} | 157 ++-
.../tool/TestBulkLoadHFilesSplitRecovery.java | 486 ++++++++
.../TestLoadIncrementalHFilesSplitRecovery.java | 630 ----------
...alHFiles.java => TestSecureBulkLoadHFiles.java} | 11 +-
... => TestSecureBulkLoadHFilesSplitRecovery.java} | 9 +-
44 files changed, 2361 insertions(+), 3042 deletions(-)
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
index 1256289..9daa282 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/MapReduceRestoreJob.java
@@ -17,11 +17,9 @@
*/
package org.apache.hadoop.hbase.backup.mapreduce;
-import static org.apache.hadoop.hbase.backup.util.BackupUtils.failed;
import static org.apache.hadoop.hbase.backup.util.BackupUtils.succeeded;
import java.io.IOException;
-
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -29,7 +27,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.RestoreJob;
import org.apache.hadoop.hbase.backup.util.BackupUtils;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.util.Tool;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -42,8 +40,7 @@ import org.slf4j.LoggerFactory;
* HFiles which are aligned with a region boundaries of a table being
* restored.
*
- * The resulting HFiles then are loaded using HBase bulk load tool
- * {@link LoadIncrementalHFiles}
+ * The resulting HFiles then are loaded using HBase bulk load tool {@link BulkLoadHFiles}.
*/
@InterfaceAudience.Private
public class MapReduceRestoreJob implements RestoreJob {
@@ -88,23 +85,20 @@ public class MapReduceRestoreJob implements RestoreJob {
};
int result;
- int loaderResult;
try {
player.setConf(getConf());
result = player.run(playerArgs);
if (succeeded(result)) {
// do bulk load
- LoadIncrementalHFiles loader = BackupUtils.createLoader(getConf());
+ BulkLoadHFiles loader = BackupUtils.createLoader(getConf());
if (LOG.isDebugEnabled()) {
LOG.debug("Restoring HFiles from directory " + bulkOutputPath);
}
- String[] args = { bulkOutputPath.toString(), newTableNames[i].getNameAsString() };
- loaderResult = loader.run(args);
- if (failed(loaderResult)) {
- throw new IOException("Can not restore from backup directory " + dirs
- + " (check Hadoop and HBase logs). Bulk loader return code =" + loaderResult);
+ if (loader.bulkLoad(newTableNames[i], bulkOutputPath).isEmpty()) {
+ throw new IOException("Can not restore from backup directory " + dirs +
+ " (check Hadoop and HBase logs). Bulk loader returns null");
}
} else {
throw new IOException("Can not restore from backup directory " + dirs
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
index af8b954..fe2a977 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupUtils.java
@@ -30,7 +30,6 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.TreeSet;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -54,7 +53,7 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
@@ -722,7 +721,7 @@ public final class BackupUtils {
return result == 0;
}
- public static LoadIncrementalHFiles createLoader(Configuration config) throws IOException {
+ public static BulkLoadHFiles createLoader(Configuration config) {
// set configuration for restore:
// LoadIncrementalHFile needs more time
// <name>hbase.rpc.timeout</name> <value>600000</value>
@@ -732,15 +731,9 @@ public final class BackupUtils {
// By default, it is 32 and loader will fail if # of files in any region exceed this
// limit. Bad for snapshot restore.
- conf.setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
- conf.set(LoadIncrementalHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
- LoadIncrementalHFiles loader;
- try {
- loader = new LoadIncrementalHFiles(conf);
- } catch (Exception e) {
- throw new IOException(e);
- }
- return loader;
+ conf.setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY, Integer.MAX_VALUE);
+ conf.set(BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY, "yes");
+ return BulkLoadHFiles.create(conf);
}
public static String findMostRecentBackupId(String[] backupIds) {
diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
index 13b183d..92254fa 100644
--- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
+++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/RestoreTool.java
@@ -25,7 +25,6 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.TreeMap;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -46,7 +45,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.snapshot.SnapshotDescriptionUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
@@ -451,12 +450,12 @@ public class RestoreTool {
}
}
}
- return LoadIncrementalHFiles.inferBoundaries(map);
+ return BulkLoadHFilesTool.inferBoundaries(map);
}
/**
- * Prepare the table for bulkload, most codes copied from
- * {@link LoadIncrementalHFiles#createTable(TableName, String, Admin)}
+ * Prepare the table for bulkload, most codes copied from {@code createTable} method in
+ * {@code BulkLoadHFilesTool}.
* @param conn connection
* @param tableBackupPath path
* @param tableName table name
diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
index 74dd569..82f0fb7 100644
--- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
+++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupWithBulkLoad.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.tool.TestLoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.TestBulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.Assert;
@@ -92,7 +92,7 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
int NB_ROWS2 = 20;
LOG.debug("bulk loading into " + testName);
- int actual = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
+ int actual = TestBulkLoadHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
qualName, false, null, new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
@@ -105,7 +105,7 @@ public class TestIncrementalBackupWithBulkLoad extends TestBackupBase {
assertTrue(checkSucceeded(backupIdIncMultiple));
// #4 bulk load again
LOG.debug("bulk loading into " + testName);
- int actual1 = TestLoadIncrementalHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
+ int actual1 = TestBulkLoadHFiles.loadHFiles(testName, table1Desc, TEST_UTIL, famName,
qualName, false, null,
new byte[][][] { new byte[][] { Bytes.toBytes("ppp"), Bytes.toBytes("qqq") },
new byte[][] { Bytes.toBytes("rrr"), Bytes.toBytes("sss") }, },
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 4a2fa3a..4ec7e32 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
@@ -59,6 +60,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.io.netty.util.Timer;
@@ -695,4 +697,43 @@ public final class ConnectionUtils {
metrics -> ResultStatsUtil.updateStats(metrics, serverName, regionName, regionLoadStats));
});
}
+
+ @FunctionalInterface
+ interface Converter<D, I, S> {
+ D convert(I info, S src) throws IOException;
+ }
+
+ @FunctionalInterface
+ interface RpcCall<RESP, REQ> {
+ void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
+ RpcCallback<RESP> done);
+ }
+
+ static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(HBaseRpcController controller,
+ HRegionLocation loc, ClientService.Interface stub, REQ req,
+ Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
+ Converter<RESP, HBaseRpcController, PRESP> respConverter) {
+ CompletableFuture<RESP> future = new CompletableFuture<>();
+ try {
+ rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req),
+ new RpcCallback<PRESP>() {
+
+ @Override
+ public void run(PRESP resp) {
+ if (controller.failed()) {
+ future.completeExceptionally(controller.getFailed());
+ } else {
+ try {
+ future.complete(respConverter.convert(controller, resp));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ }
+ }
+ });
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 8050137..c357b1f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
+import org.apache.hadoop.hbase.client.ConnectionUtils.Converter;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
@@ -156,51 +157,12 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return conn.getRegionLocator(tableName);
}
- @FunctionalInterface
- private interface Converter<D, I, S> {
- D convert(I info, S src) throws IOException;
- }
-
- @FunctionalInterface
- private interface RpcCall<RESP, REQ> {
- void call(ClientService.Interface stub, HBaseRpcController controller, REQ req,
- RpcCallback<RESP> done);
- }
-
- private static <REQ, PREQ, PRESP, RESP> CompletableFuture<RESP> call(
- HBaseRpcController controller, HRegionLocation loc, ClientService.Interface stub, REQ req,
- Converter<PREQ, byte[], REQ> reqConvert, RpcCall<PRESP, PREQ> rpcCall,
- Converter<RESP, HBaseRpcController, PRESP> respConverter) {
- CompletableFuture<RESP> future = new CompletableFuture<>();
- try {
- rpcCall.call(stub, controller, reqConvert.convert(loc.getRegion().getRegionName(), req),
- new RpcCallback<PRESP>() {
-
- @Override
- public void run(PRESP resp) {
- if (controller.failed()) {
- future.completeExceptionally(controller.getFailed());
- } else {
- try {
- future.complete(respConverter.convert(controller, resp));
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- }
- }
- });
- } catch (IOException e) {
- future.completeExceptionally(e);
- }
- return future;
- }
-
private static <REQ, RESP> CompletableFuture<RESP> mutate(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub, REQ req,
Converter<MutateRequest, byte[], REQ> reqConvert,
Converter<RESP, HBaseRpcController, MutateResponse> respConverter) {
- return call(controller, loc, stub, req, reqConvert, (s, c, r, done) -> s.mutate(c, r, done),
- respConverter);
+ return ConnectionUtils.call(controller, loc, stub, req, reqConvert,
+ (s, c, r, done) -> s.mutate(c, r, done), respConverter);
}
private static <REQ> CompletableFuture<Void> voidMutate(HBaseRpcController controller,
@@ -247,7 +209,7 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private CompletableFuture<Result> get(Get get, int replicaId) {
return this.<Result, Get> newCaller(get, readRpcTimeoutNs)
- .action((controller, loc, stub) -> RawAsyncTableImpl
+ .action((controller, loc, stub) -> ConnectionUtils
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
(c, resp) -> ProtobufUtil.toResult(resp.getResult(), c.cellScanner())))
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
deleted file mode 100644
index 2186271..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SecureBulkLoadClient.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.client;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.security.token.Token;
-
-import static org.apache.hadoop.hbase.HConstants.PRIORITY_UNSET;
-
-/**
- * Client proxy for SecureBulkLoadProtocol
- */
-@InterfaceAudience.Private
-public class SecureBulkLoadClient {
- private Table table;
- private final RpcControllerFactory rpcControllerFactory;
-
- public SecureBulkLoadClient(final Configuration conf, Table table) {
- this.table = table;
- this.rpcControllerFactory = new RpcControllerFactory(conf);
- }
-
- public String prepareBulkLoad(final Connection conn) throws IOException {
- try {
- ClientServiceCallable<String> callable = new ClientServiceCallable<String>(conn,
- table.getName(), HConstants.EMPTY_START_ROW,
- this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
- @Override
- protected String rpcCall() throws Exception {
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- RegionSpecifier region =
- RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
- PrepareBulkLoadRequest request = PrepareBulkLoadRequest.newBuilder()
- .setTableName(ProtobufUtil.toProtoTableName(table.getName()))
- .setRegion(region).build();
- PrepareBulkLoadResponse response = getStub().prepareBulkLoad(null, request);
- return response.getBulkToken();
- }
- };
- return RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
- .<String> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
- } catch (Throwable throwable) {
- throw new IOException(throwable);
- }
- }
-
- public void cleanupBulkLoad(final Connection conn, final String bulkToken) throws IOException {
- try {
- ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
- table.getName(), HConstants.EMPTY_START_ROW, this.rpcControllerFactory.newController(), PRIORITY_UNSET) {
- @Override
- protected Void rpcCall() throws Exception {
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- RegionSpecifier region = RequestConverter.buildRegionSpecifier(
- RegionSpecifierType.REGION_NAME, regionName);
- CleanupBulkLoadRequest request =
- CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bulkToken).build();
- getStub().cleanupBulkLoad(null, request);
- return null;
- }
- };
- RpcRetryingCallerFactory.instantiate(conn.getConfiguration(), null)
- .<Void> newCaller().callWithRetries(callable, Integer.MAX_VALUE);
- } catch (Throwable throwable) {
- throw new IOException(throwable);
- }
- }
-
- /**
- * Securely bulk load a list of HFiles using client protocol.
- *
- * @param client
- * @param familyPaths
- * @param regionName
- * @param assignSeqNum
- * @param userToken
- * @param bulkToken
- * @return true if all are loaded
- * @throws IOException
- */
- public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
- final List<Pair<byte[], String>> familyPaths,
- final byte[] regionName, boolean assignSeqNum,
- final Token<?> userToken, final String bulkToken) throws IOException {
- return secureBulkLoadHFiles(client, familyPaths, regionName, assignSeqNum, userToken, bulkToken,
- false);
- }
-
- /**
- * Securely bulk load a list of HFiles using client protocol.
- *
- * @param client
- * @param familyPaths
- * @param regionName
- * @param assignSeqNum
- * @param userToken
- * @param bulkToken
- * @param copyFiles
- * @return true if all are loaded
- * @throws IOException
- */
- public boolean secureBulkLoadHFiles(final ClientService.BlockingInterface client,
- final List<Pair<byte[], String>> familyPaths,
- final byte[] regionName, boolean assignSeqNum,
- final Token<?> userToken, final String bulkToken, boolean copyFiles) throws IOException {
- BulkLoadHFileRequest request =
- RequestConverter.buildBulkLoadHFileRequest(familyPaths, regionName, assignSeqNum,
- userToken, bulkToken, copyFiles);
-
- try {
- BulkLoadHFileResponse response = client.bulkLoadHFile(null, request);
- return response.getLoaded();
- } catch (Exception se) {
- throw ProtobufUtil.handleRemoteException(se);
- }
- }
-}
diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
index eb575c5..3e823c3 100644
--- a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
+++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSyncUpToolWithBulkLoadedData.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.BeforeClass;
@@ -218,9 +218,7 @@ public class TestReplicationSyncUpToolWithBulkLoadedData extends TestReplication
}
final TableName tableName = source.getName();
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(utility1.getConfiguration());
- String[] args = { dir.toString(), tableName.toString() };
- loader.run(args);
+ BulkLoadHFiles.create(utility1.getConfiguration()).bulkLoad(tableName, dir);
}
private void wait(Table target, int expectedCount, String msg) throws IOException,
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
index a28c9f6..79dfe6c 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestBulkLoad.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
@@ -60,7 +59,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RegionSplitter;
@@ -86,6 +85,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
@@ -292,24 +292,18 @@ public class IntegrationTestBulkLoad extends IntegrationTestBase {
// Set where to place the hfiles.
FileOutputFormat.setOutputPath(job, p);
- try (Connection conn = ConnectionFactory.createConnection(conf);
- Admin admin = conn.getAdmin();
- Table table = conn.getTable(getTablename());
- RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
-
+ try (Connection conn = ConnectionFactory.createConnection(conf); Admin admin = conn.getAdmin();
+ RegionLocator regionLocator = conn.getRegionLocator(getTablename())) {
// Configure the partitioner and other things needed for HFileOutputFormat.
- HFileOutputFormat2.configureIncrementalLoad(job, table.getTableDescriptor(), regionLocator);
-
+ HFileOutputFormat2.configureIncrementalLoad(job, admin.getDescriptor(getTablename()),
+ regionLocator);
// Run the job making sure it works.
assertEquals(true, job.waitForCompletion(true));
-
- // Create a new loader.
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
-
- // Load the HFiles in.
- loader.doBulkLoad(p, admin, table, regionLocator);
}
-
+ // Create a new loader.
+ BulkLoadHFiles loader = BulkLoadHFiles.create(conf);
+ // Load the HFiles in.
+ loader.bulkLoad(getTablename(), p);
// Delete the files.
util.getTestFileSystem().delete(p, true);
}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
index ab5f2bb..c80d61c 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/mapreduce/IntegrationTestImportTsv.java
@@ -29,7 +29,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@@ -45,7 +44,7 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
import org.apache.hadoop.util.Tool;
@@ -60,7 +59,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Validate ImportTsv + LoadIncrementalHFiles on a distributed cluster.
+ * Validate ImportTsv + BulkLoadFiles on a distributed cluster.
*/
@Category(IntegrationTests.class)
public class IntegrationTestImportTsv extends Configured implements Tool {
@@ -141,8 +140,8 @@ public class IntegrationTestImportTsv extends Configured implements Tool {
String[] args = { hfiles.toString(), tableName.getNameAsString() };
LOG.info(format("Running LoadIncrememntalHFiles with args: %s", Arrays.asList(args)));
- assertEquals("Loading HFiles failed.",
- 0, ToolRunner.run(new LoadIncrementalHFiles(new Configuration(getConf())), args));
+ assertEquals("Loading HFiles failed.", 0,
+ ToolRunner.run(new BulkLoadHFilesTool(getConf()), args));
Table table = null;
Scan scan = new Scan() {{
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
index b59c9e6..a443b4b 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/CopyTable.java
@@ -22,28 +22,28 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
-
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
-import org.apache.hadoop.hbase.mapreduce.Import.Importer;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.Import.CellImporter;
+import org.apache.hadoop.hbase.mapreduce.Import.Importer;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Tool used to copy a table to another one which can be on a different setup.
@@ -416,13 +416,12 @@ public class CopyTable extends Configured implements Tool {
int code = 0;
if (bulkload) {
LOG.info("Trying to bulk load data to destination table: " + dstTableName);
- LOG.info("command: ./bin/hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {} {}",
+ LOG.info("command: ./bin/hbase {} {} {}", BulkLoadHFilesTool.NAME,
this.bulkloadDir.toString(), this.dstTableName);
- code = new LoadIncrementalHFiles(this.getConf())
- .run(new String[] { this.bulkloadDir.toString(), this.dstTableName });
- if (code == 0) {
- // bulkloadDir is deleted only LoadIncrementalHFiles was successful so that one can rerun
- // LoadIncrementalHFiles.
+ if (!BulkLoadHFiles.create(getConf()).bulkLoad(TableName.valueOf(dstTableName), bulkloadDir)
+ .isEmpty()) {
+ // bulkloadDir is deleted only BulkLoadHFiles was successful so that one can rerun
+ // BulkLoadHFiles.
FileSystem fs = FSUtils.getCurrentFileSystem(getConf());
if (!fs.delete(this.bulkloadDir, true)) {
LOG.error("Deleting folder " + bulkloadDir + " failed!");
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
index b48ecf0..62fc06d 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HRegionPartitioner.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.mapreduce.Partitioner;
*
* <p>This class is not suitable as partitioner creating hfiles
* for incremental bulk loads as region spread will likely change between time of
- * hfile creation and load time. See {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles}
+ * hfile creation and load time. See {@link org.apache.hadoop.hbase.tool.BulkLoadHFiles}
* and <a href="http://hbase.apache.org/book.html#arch.bulk.load">Bulk Load</a>.</p>
*
* @param <KEY> The type of the key.
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
index 5c0bb2b..c9f5a2e 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestHFileOutputFormat2.java
@@ -90,7 +90,7 @@ import org.apache.hadoop.hbase.regionserver.TestHRegionFileSystem;
import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.VerySlowMapReduceTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
@@ -707,18 +707,17 @@ public class TestHFileOutputFormat2 {
}
Table currentTable = allTables.get(tableNameStr);
TableName currentTableName = currentTable.getName();
- new LoadIncrementalHFiles(conf).doBulkLoad(tableDir, admin, currentTable, singleTableInfo
- .getRegionLocator());
+ BulkLoadHFiles.create(conf).bulkLoad(currentTableName, tableDir);
// Ensure data shows up
int expectedRows = 0;
if (putSortReducer) {
// no rows should be extracted
- assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+ assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
util.countRows(currentTable));
} else {
expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
- assertEquals("LoadIncrementalHFiles should put expected data in table", expectedRows,
+ assertEquals("BulkLoadHFiles should put expected data in table", expectedRows,
util.countRows(currentTable));
Scan scan = new Scan();
ResultScanner results = currentTable.getScanner(scan);
@@ -1248,14 +1247,14 @@ public class TestHFileOutputFormat2 {
for (int i = 0; i < 2; i++) {
Path testDir = util.getDataTestDirOnTestFS("testExcludeAllFromMinorCompaction_" + i);
runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
- .getTableDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
+ .getDescriptor(), conn.getRegionLocator(TABLE_NAMES[0]))), testDir, false);
// Perform the actual load
- new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, locator);
+ BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
}
// Ensure data shows up
int expectedRows = 2 * NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
- assertEquals("LoadIncrementalHFiles should put expected data in table",
+ assertEquals("BulkLoadHFiles should put expected data in table",
expectedRows, util.countRows(table));
// should have a second StoreFile now
@@ -1340,15 +1339,16 @@ public class TestHFileOutputFormat2 {
true);
RegionLocator regionLocator = conn.getRegionLocator(TABLE_NAMES[0]);
- runIncrementalPELoad(conf, Arrays.asList(new HFileOutputFormat2.TableInfo(table
- .getTableDescriptor(), regionLocator)), testDir, false);
+ runIncrementalPELoad(conf,
+ Arrays.asList(new HFileOutputFormat2.TableInfo(table.getDescriptor(), regionLocator)),
+ testDir, false);
// Perform the actual load
- new LoadIncrementalHFiles(conf).doBulkLoad(testDir, admin, table, regionLocator);
+ BulkLoadHFiles.create(conf).bulkLoad(table.getName(), testDir);
// Ensure data shows up
int expectedRows = NMapInputFormat.getNumMapTasks(conf) * ROWSPERSPLIT;
- assertEquals("LoadIncrementalHFiles should put expected data in table",
+ assertEquals("BulkLoadHFiles should put expected data in table",
expectedRows + 1, util.countRows(table));
// should have a second StoreFile now
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 0ad77ba..c3f8f8b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
@@ -66,4 +68,21 @@ public interface AsyncClusterConnection extends AsyncConnection {
*/
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
boolean reload);
+
+ /**
+ * Return the token for this bulk load.
+ */
+ CompletableFuture<String> prepareBulkLoad(TableName tableName);
+
+ /**
+ * Securely bulk load a list of HFiles.
+ * @param row used to locate the region
+ */
+ CompletableFuture<Boolean> bulkLoad(TableName tableName, List<Pair<byte[], String>> familyPaths,
+ byte[] row, boolean assignSeqNum, Token<?> userToken, String bulkToken, boolean copyFiles);
+
+ /**
+ * Clean up after finishing bulk load, no matter success or not.
+ */
+ CompletableFuture<Void> cleanupBulkLoad(TableName tableName, String bulkToken);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index d61f01f..328b959 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -21,15 +21,28 @@ import java.net.SocketAddress;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.security.token.Token;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
/**
* The implementation of AsyncClusterConnection.
@@ -77,4 +90,46 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
boolean reload) {
return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
}
+
+ @Override
+ public CompletableFuture<String> prepareBulkLoad(TableName tableName) {
+ return callerFactory.<String> single().table(tableName).row(HConstants.EMPTY_START_ROW)
+ .action((controller, loc, stub) -> ConnectionUtils
+ .<TableName, PrepareBulkLoadRequest, PrepareBulkLoadResponse, String> call(controller, loc,
+ stub, tableName, (rn, tn) -> {
+ RegionSpecifier region =
+ RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, rn);
+ return PrepareBulkLoadRequest.newBuilder()
+ .setTableName(ProtobufUtil.toProtoTableName(tn)).setRegion(region).build();
+ }, (s, c, req, done) -> s.prepareBulkLoad(c, req, done),
+ (c, resp) -> resp.getBulkToken()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Boolean> bulkLoad(TableName tableName,
+ List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
+ String bulkToken, boolean copyFiles) {
+ return callerFactory.<Boolean> single().table(tableName).row(row)
+ .action((controller, loc, stub) -> ConnectionUtils
+ .<Void, BulkLoadHFileRequest, BulkLoadHFileResponse, Boolean> call(controller, loc, stub,
+ null,
+ (rn, nil) -> RequestConverter.buildBulkLoadHFileRequest(familyPaths, rn, assignSeqNum,
+ userToken, bulkToken, copyFiles),
+ (s, c, req, done) -> s.bulkLoadHFile(c, req, done), (c, resp) -> resp.getLoaded()))
+ .call();
+ }
+
+ @Override
+ public CompletableFuture<Void> cleanupBulkLoad(TableName tableName, String bulkToken) {
+ return callerFactory.<Void> single().table(tableName).row(HConstants.EMPTY_START_ROW)
+ .action((controller, loc, stub) -> ConnectionUtils
+ .<String, CleanupBulkLoadRequest, CleanupBulkLoadResponse, Void> call(controller, loc, stub,
+ bulkToken, (rn, bt) -> {
+ RegionSpecifier region =
+ RequestConverter.buildRegionSpecifier(RegionSpecifierType.REGION_NAME, rn);
+ return CleanupBulkLoadRequest.newBuilder().setRegion(region).setBulkToken(bt).build();
+ }, (s, c, req, done) -> s.cleanupBulkLoad(c, req, done), (c, resp) -> null))
+ .call();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 2670420..46c0f5a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -19,9 +19,11 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.net.SocketAddress;
+import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -30,6 +32,9 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ClusterConnectionFactory {
+ public static final String HBASE_SERVER_CLUSTER_CONNECTION_IMPL =
+ "hbase.server.cluster.connection.impl";
+
private ClusterConnectionFactory() {
}
@@ -46,6 +51,15 @@ public final class ClusterConnectionFactory {
SocketAddress localAddress, User user) throws IOException {
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
String clusterId = FutureUtils.get(registry.getClusterId());
- return new AsyncClusterConnectionImpl(conf, registry, clusterId, localAddress, user);
+ Class<? extends AsyncClusterConnection> clazz =
+ conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class,
+ AsyncClusterConnection.class);
+ try {
+ return user
+ .runAs((PrivilegedExceptionAction<? extends AsyncClusterConnection>) () -> ReflectionUtils
+ .newInstance(clazz, conf, registry, clusterId, localAddress, user));
+ } catch (Exception e) {
+ throw new IOException(e);
+ }
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
deleted file mode 100644
index 6f5412f..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/**
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapreduce;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Tool to load the output of HFileOutputFormat into an existing table.
- * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Use
- * {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles} instead.
- */
-@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NM_SAME_SIMPLE_NAME_AS_SUPERCLASS",
- justification = "Temporary glue. To be removed")
-@Deprecated
-@InterfaceAudience.Public
-public class LoadIncrementalHFiles extends org.apache.hadoop.hbase.tool.LoadIncrementalHFiles {
-
- /**
- * @deprecated As of release 2.0.0, this will be removed in HBase 3.0.0. Use
- * {@link org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem} instead.
- */
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "NM_SAME_SIMPLE_NAME_AS_SUPERCLASS",
- justification = "Temporary glue. To be removed")
- @Deprecated
- @InterfaceAudience.Public
- public static class LoadQueueItem
- extends org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem {
-
- public LoadQueueItem(byte[] family, Path hfilePath) {
- super(family, hfilePath);
- }
- }
-
- public LoadIncrementalHFiles(Configuration conf) {
- super(conf);
- }
-
- public Map<LoadQueueItem, ByteBuffer> run(String dirPath, Map<byte[], List<Path>> map,
- TableName tableName) throws IOException {
- Map<org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> originRet;
- if (dirPath != null) {
- originRet = run(dirPath, tableName);
- } else {
- originRet = run(map, tableName);
- }
- Map<LoadQueueItem, ByteBuffer> ret = new HashMap<>();
- originRet.forEach((k, v) -> {
- ret.put(new LoadQueueItem(k.getFamily(), k.getFilePath()), v);
- });
- return ret;
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 9f1ab96..a5823ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -40,7 +40,6 @@ import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -82,8 +81,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.security.EncryptionUtil;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
@@ -91,6 +89,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
/**
* An implementation of {@link MobCompactor} that compacts the mob files in partitions.
*/
@@ -675,7 +675,7 @@ public class PartitionedMobCompactor extends MobCompactor {
cleanupTmpMobFile = false;
cleanupCommittedMobFile = true;
// bulkload the ref file
- bulkloadRefFile(connection, table, bulkloadPathOfPartition, filePath.getName());
+ bulkloadRefFile(table.getName(), bulkloadPathOfPartition, filePath.getName());
cleanupCommittedMobFile = false;
newFiles.add(new Path(mobFamilyDir, filePath.getName()));
}
@@ -818,21 +818,16 @@ public class PartitionedMobCompactor extends MobCompactor {
/**
* Bulkloads the current file.
- *
- * @param connection to use to get admin/RegionLocator
- * @param table The current table.
+ * @param tableName The table to load into.
* @param bulkloadDirectory The path of bulkload directory.
* @param fileName The current file name.
* @throws IOException if IO failure is encountered
*/
- private void bulkloadRefFile(Connection connection, Table table, Path bulkloadDirectory,
- String fileName)
+ private void bulkloadRefFile(TableName tableName, Path bulkloadDirectory, String fileName)
throws IOException {
// bulkload the ref file
try {
- LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
- bulkload.doBulkLoad(bulkloadDirectory, connection.getAdmin(), table,
- connection.getRegionLocator(table.getName()));
+ BulkLoadHFiles.create(conf).bulkLoad(tableName, bulkloadDirectory);
} catch (Exception e) {
throw new IOException(e);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index 1f44817..6204ea5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -1,17 +1,22 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -30,33 +35,32 @@ import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles.LoadQueueItem;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
- * staging directory and then it will use ({@link LoadIncrementalHFiles} to prepare a collection of
+ * staging directory and then it will use ({@link BulkLoadHFiles} to prepare a collection of
* {@link LoadQueueItem} which will finally be loaded(replicated) into the table of this cluster.
*/
@InterfaceAudience.Private
@@ -82,7 +86,7 @@ public class HFileReplicator {
private FsDelegationToken fsDelegationToken;
private UserProvider userProvider;
private Configuration conf;
- private Connection connection;
+ private AsyncClusterConnection connection;
private Path hbaseStagingDir;
private ThreadPoolExecutor exec;
private int maxCopyThreads;
@@ -91,7 +95,7 @@ public class HFileReplicator {
public HFileReplicator(Configuration sourceClusterConf,
String sourceBaseNamespaceDirPath, String sourceHFileArchiveDirPath,
Map<String, List<Pair<byte[], List<String>>>> tableQueueMap, Configuration conf,
- Connection connection) throws IOException {
+ AsyncClusterConnection connection) throws IOException {
this.sourceClusterConf = sourceClusterConf;
this.sourceBaseNamespaceDirPath = sourceBaseNamespaceDirPath;
this.sourceHFileArchiveDirPath = sourceHFileArchiveDirPath;
@@ -128,96 +132,61 @@ public class HFileReplicator {
String tableNameString = tableStagingDir.getKey();
Path stagingDir = tableStagingDir.getValue();
- LoadIncrementalHFiles loadHFiles = null;
- try {
- loadHFiles = new LoadIncrementalHFiles(conf);
- } catch (Exception e) {
- LOG.error("Failed to initialize LoadIncrementalHFiles for replicating bulk loaded"
- + " data.", e);
- throw new IOException(e);
- }
- Configuration newConf = HBaseConfiguration.create(conf);
- newConf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
- loadHFiles.setConf(newConf);
-
TableName tableName = TableName.valueOf(tableNameString);
- Table table = this.connection.getTable(tableName);
// Prepare collection of queue of hfiles to be loaded(replicated)
Deque<LoadQueueItem> queue = new LinkedList<>();
- loadHFiles.prepareHFileQueue(stagingDir, table, queue, false);
+ BulkLoadHFilesTool.prepareHFileQueue(conf, connection, tableName, stagingDir, queue, false,
+ false);
if (queue.isEmpty()) {
LOG.warn("Replication process did not find any files to replicate in directory "
+ stagingDir.toUri());
return null;
}
-
- try (RegionLocator locator = connection.getRegionLocator(tableName)) {
-
- fsDelegationToken.acquireDelegationToken(sinkFs);
-
- // Set the staging directory which will be used by LoadIncrementalHFiles for loading the
- // data
- loadHFiles.setBulkToken(stagingDir.toString());
-
- doBulkLoad(loadHFiles, table, queue, locator, maxRetries);
+ fsDelegationToken.acquireDelegationToken(sinkFs);
+ try {
+ doBulkLoad(conf, tableName, stagingDir, queue, maxRetries);
} finally {
- cleanup(stagingDir.toString(), table);
+ cleanup(stagingDir);
}
}
return null;
}
- private void doBulkLoad(LoadIncrementalHFiles loadHFiles, Table table,
- Deque<LoadQueueItem> queue, RegionLocator locator, int maxRetries) throws IOException {
- int count = 0;
- Pair<byte[][], byte[][]> startEndKeys;
- while (!queue.isEmpty()) {
- // need to reload split keys each iteration.
- startEndKeys = locator.getStartEndKeys();
+ private void doBulkLoad(Configuration conf, TableName tableName, Path stagingDir,
+ Deque<LoadQueueItem> queue, int maxRetries) throws IOException {
+ BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
+ // Set the staging directory which will be used by BulkLoadHFilesTool for loading the data
+ loader.setBulkToken(stagingDir.toString());
+ for (int count = 0; !queue.isEmpty(); count++) {
if (count != 0) {
- LOG.warn("Error occurred while replicating HFiles, retry attempt " + count + " with "
- + queue.size() + " files still remaining to replicate.");
+ LOG.warn("Error occurred while replicating HFiles, retry attempt " + count + " with " +
+ queue.size() + " files still remaining to replicate.");
}
if (maxRetries != 0 && count >= maxRetries) {
- throw new IOException("Retry attempted " + count
- + " times without completing, bailing out.");
+ throw new IOException(
+ "Retry attempted " + count + " times without completing, bailing out.");
}
- count++;
// Try bulk load
- loadHFiles.loadHFileQueue(table, connection, queue, startEndKeys);
+ loader.loadHFileQueue(connection, tableName, queue, false);
}
}
- private void cleanup(String stagingDir, Table table) {
+ private void cleanup(Path stagingDir) {
// Release the file system delegation token
fsDelegationToken.releaseDelegationToken();
// Delete the staging directory
if (stagingDir != null) {
try {
- sinkFs.delete(new Path(stagingDir), true);
+ sinkFs.delete(stagingDir, true);
} catch (IOException e) {
LOG.warn("Failed to delete the staging directory " + stagingDir, e);
}
}
// Do not close the file system
-
- /*
- * if (sinkFs != null) { try { sinkFs.close(); } catch (IOException e) { LOG.warn(
- * "Failed to close the file system"); } }
- */
-
- // Close the table
- if (table != null) {
- try {
- table.close();
- } catch (IOException e) {
- LOG.warn("Failed to close the table.", e);
- }
- }
}
private Map<String, Path> copyHFilesToStagingDir() throws IOException {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index 3cd928a..e30e637 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -27,8 +26,9 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.UUID;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
-
+import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -40,16 +40,18 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
@@ -83,7 +85,7 @@ public class ReplicationSink {
private final Configuration conf;
// Volatile because of note in here -- look for double-checked locking:
// http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
- private volatile Connection sharedConn;
+ private volatile AsyncClusterConnection sharedConn;
private final MetricsSink metrics;
private final AtomicLong totalReplicatedEdits = new AtomicLong();
private final Object sharedConnLock = new Object();
@@ -390,37 +392,34 @@ public class ReplicationSink {
* Do the changes and handle the pool
* @param tableName table to insert into
* @param allRows list of actions
- * @throws IOException
*/
private void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
if (allRows.isEmpty()) {
return;
}
- Connection connection = getConnection();
- try (Table table = connection.getTable(tableName)) {
- for (List<Row> rows : allRows) {
- table.batch(rows, null);
- }
- } catch (RetriesExhaustedWithDetailsException rewde) {
- for (Throwable ex : rewde.getCauses()) {
- if (ex instanceof TableNotFoundException) {
+ AsyncTable<?> table = getConnection().getTable(tableName);
+ List<Future<?>> futures = allRows.stream().map(table::batchAll).collect(Collectors.toList());
+ for (Future<?> future : futures) {
+ try {
+ FutureUtils.get(future);
+ } catch (RetriesExhaustedException e) {
+ if (e.getCause() instanceof TableNotFoundException) {
throw new TableNotFoundException("'" + tableName + "'");
}
+ throw e;
}
- throw rewde;
- } catch (InterruptedException ix) {
- throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
}
}
- private Connection getConnection() throws IOException {
+ private AsyncClusterConnection getConnection() throws IOException {
// See https://en.wikipedia.org/wiki/Double-checked_locking
- Connection connection = sharedConn;
+ AsyncClusterConnection connection = sharedConn;
if (connection == null) {
synchronized (sharedConnLock) {
connection = sharedConn;
if (connection == null) {
- connection = ConnectionFactory.createConnection(conf);
+ connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null,
+ UserProvider.instantiate(conf).getCurrent());
sharedConn = connection;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntrySinkFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntrySinkFilter.java
index f0b13e1..6f6ae1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntrySinkFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntrySinkFilter.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,9 +18,10 @@
package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
/**
* Implementations are installed on a Replication Sink called from inside
@@ -36,6 +37,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* source-side.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
+@InterfaceStability.Evolving
public interface WALEntrySinkFilter {
/**
* Name of configuration to set with name of implementing WALEntrySinkFilter class.
@@ -46,7 +48,7 @@ public interface WALEntrySinkFilter {
* Called after Construction.
* Use passed Connection to keep any context the filter might need.
*/
- void init(Connection connection);
+ void init(AsyncConnection conn);
/**
* @param table Table edit is destined for.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 795bd66..3b1510b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -17,48 +17,1026 @@
*/
package org.apache.hadoop.hbase.tool;
+import static java.lang.String.format;
+
+import java.io.FileNotFoundException;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.io.HFileLink;
+import org.apache.hadoop.hbase.io.HalfStoreFileReader;
+import org.apache.hadoop.hbase.io.Reference;
+import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.BloomType;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
+import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.security.token.FsDelegationToken;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSVisitor;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a
* tool.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-public class BulkLoadHFilesTool extends LoadIncrementalHFiles implements BulkLoadHFiles {
+public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, Tool {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BulkLoadHFilesTool.class);
public static final String NAME = "completebulkload";
+ // We use a '.' prefix which is ignored when walking directory trees
+ // above. It is invalid family name.
+ static final String TMP_DIR = ".tmp";
+
+ private final int maxFilesPerRegionPerFamily;
+ private final boolean assignSeqIds;
+
+ // Source delegation token
+ private final FsDelegationToken fsDelegationToken;
+ private final UserProvider userProvider;
+ private final int nrThreads;
+ private final AtomicInteger numRetries = new AtomicInteger(0);
+ private String bulkToken;
+
public BulkLoadHFilesTool(Configuration conf) {
- super(conf);
+ // make a copy, just to be sure we're not overriding someone else's config
+ super(new Configuration(conf));
+ // disable blockcache for tool invocation, see HBASE-10500
+ conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
+ userProvider = UserProvider.instantiate(conf);
+ fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
+ assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
+ maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
+ nrThreads =
+ conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
+ }
+
+ // Initialize a thread pool
+ private ExecutorService createExecutorService() {
+ ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<>(),
+ new ThreadFactoryBuilder().setNameFormat("BulkLoadHFilesTool-%1$d").build());
+ pool.allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
+ private boolean isCreateTable() {
+ return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
+ }
+
+ private boolean isSilence() {
+ return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
+ }
+
+ private boolean isAlwaysCopyFiles() {
+ return getConf().getBoolean(ALWAYS_COPY_FILES, false);
+ }
+
+ private static boolean shouldCopyHFileMetaKey(byte[] key) {
+ // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
+ if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
+ return false;
+ }
+
+ return !HFile.isReservedFileInfoKey(key);
+ }
+
+ /**
+ * Checks whether there is any invalid family name in HFiles to be bulk loaded.
+ */
+ private static void validateFamiliesInHFiles(TableDescriptor tableDesc,
+ Deque<LoadQueueItem> queue, boolean silence) throws IOException {
+ Set<String> familyNames = Arrays.asList(tableDesc.getColumnFamilies()).stream()
+ .map(f -> f.getNameAsString()).collect(Collectors.toSet());
+ List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
+ .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
+ if (unmatchedFamilies.size() > 0) {
+ String msg =
+ "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " +
+ unmatchedFamilies + "; valid family names of table " + tableDesc.getTableName() +
+ " are: " + familyNames;
+ LOG.error(msg);
+ if (!silence) {
+ throw new IOException(msg);
+ }
+ }
+ }
+
+ /**
+ * Populate the Queue with given HFiles
+ */
+ private static void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
+ map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
+ }
+
+ private interface BulkHFileVisitor<TFamily> {
+
+ TFamily bulkFamily(byte[] familyName) throws IOException;
+
+ void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
+ }
+
+ /**
+ * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
+ * skip non-valid hfiles by default, or skip this validation by setting
+ * 'hbase.loadincremental.validate.hfile' to false.
+ */
+ private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
+ BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
+ FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
+ for (FileStatus familyStat : familyDirStatuses) {
+ if (!familyStat.isDirectory()) {
+ LOG.warn("Skipping non-directory " + familyStat.getPath());
+ continue;
+ }
+ Path familyDir = familyStat.getPath();
+ byte[] familyName = Bytes.toBytes(familyDir.getName());
+ // Skip invalid family
+ try {
+ ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Skipping invalid " + familyStat.getPath());
+ continue;
+ }
+ TFamily family = visitor.bulkFamily(familyName);
+
+ FileStatus[] hfileStatuses = fs.listStatus(familyDir);
+ for (FileStatus hfileStatus : hfileStatuses) {
+ if (!fs.isFile(hfileStatus.getPath())) {
+ LOG.warn("Skipping non-file " + hfileStatus);
+ continue;
+ }
+
+ Path hfile = hfileStatus.getPath();
+ // Skip "_", reference, HFileLink
+ String fileName = hfile.getName();
+ if (fileName.startsWith("_")) {
+ continue;
+ }
+ if (StoreFileInfo.isReference(fileName)) {
+ LOG.warn("Skipping reference " + fileName);
+ continue;
+ }
+ if (HFileLink.isHFileLink(fileName)) {
+ LOG.warn("Skipping HFileLink " + fileName);
+ continue;
+ }
+
+ // Validate HFile Format if needed
+ if (validateHFile) {
+ try {
+ if (!HFile.isHFileFormat(fs, hfile)) {
+ LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
+ continue;
+ }
+ } catch (FileNotFoundException e) {
+ LOG.warn("the file " + hfile + " was removed");
+ continue;
+ }
+ }
+
+ visitor.bulkHFile(family, hfileStatus);
+ }
+ }
+ }
+
+ /**
+ * Walk the given directory for all HFiles, and return a Queue containing all such files.
+ */
+ private static void discoverLoadQueue(Configuration conf, Deque<LoadQueueItem> ret, Path hfofDir,
+ boolean validateHFile) throws IOException {
+ visitBulkHFiles(hfofDir.getFileSystem(conf), hfofDir, new BulkHFileVisitor<byte[]>() {
+ @Override
+ public byte[] bulkFamily(final byte[] familyName) {
+ return familyName;
+ }
+
+ @Override
+ public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
+ long length = hfile.getLen();
+ if (length > conf.getLong(HConstants.HREGION_MAX_FILESIZE,
+ HConstants.DEFAULT_MAX_FILE_SIZE)) {
+ LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length +
+ " bytes can be problematic as it may lead to oversplitting.");
+ }
+ ret.add(new LoadQueueItem(family, hfile.getPath()));
+ }
+ }, validateHFile);
+ }
+
+ /**
+ * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param map map of family to List of hfiles
+ * @param tableName table to which hfiles should be loaded
+ * @param queue queue which needs to be loaded into the table
+ * @param silence true to ignore unmatched column families
+ * @throws IOException If any I/O or network error occurred
+ */
+ public static void prepareHFileQueue(AsyncClusterConnection conn, TableName tableName,
+ Map<byte[], List<Path>> map, Deque<LoadQueueItem> queue, boolean silence) throws IOException {
+ populateLoadQueue(queue, map);
+ validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue,
+ silence);
+ }
+
+ /**
+ * Prepare a collection of {@code LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param hfilesDir directory containing list of hfiles to be loaded into the table
+ * @param queue queue which needs to be loaded into the table
+ * @param validateHFile if true hfiles will be validated for its format
+ * @param silence true to ignore unmatched column families
+ * @throws IOException If any I/O or network error occurred
+ */
+ public static void prepareHFileQueue(Configuration conf, AsyncClusterConnection conn,
+ TableName tableName, Path hfilesDir, Deque<LoadQueueItem> queue, boolean validateHFile,
+ boolean silence) throws IOException {
+ discoverLoadQueue(conf, queue, hfilesDir, validateHFile);
+ validateFamiliesInHFiles(FutureUtils.get(conn.getAdmin().getDescriptor(tableName)), queue,
+ silence);
+ }
+
+ /**
+ * Used by the replication sink to load the hfiles from the source cluster. It does the following,
+ * <ol>
+ * <li>{@link #groupOrSplitPhase(AsyncClusterConnection, TableName, ExecutorService, Deque, List)}
+ * </li>
+ * <li>{@link #bulkLoadPhase(AsyncClusterConnection, TableName, Deque, Multimap, boolean, Map)}
+ * </li>
+ * </ol>
+ * @param conn Connection to use
+ * @param tableName Table to which these hfiles should be loaded to
+ * @param queue {@code LoadQueueItem} has hfiles yet to be loaded
+ */
+ public void loadHFileQueue(AsyncClusterConnection conn, TableName tableName,
+ Deque<LoadQueueItem> queue, boolean copyFiles) throws IOException {
+ ExecutorService pool = createExecutorService();
+ try {
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups = groupOrSplitPhase(conn, tableName, pool,
+ queue, FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys())).getFirst();
+ bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, null);
+ } finally {
+ pool.shutdown();
+ }
+ }
+
+ /**
+ * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
+ * re-queued for another pass with the groupOrSplitPhase.
+ * <p/>
+ * protected for testing.
+ */
+ @VisibleForTesting
+ protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
+ Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+ // atomically bulk load the groups.
+ List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>();
+ for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap()
+ .entrySet()) {
+ byte[] first = entry.getKey().array();
+ final Collection<LoadQueueItem> lqis = entry.getValue();
+ List<Pair<byte[], String>> familyPaths =
+ lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
+ .collect(Collectors.toList());
+ CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
+ FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
+ fsDelegationToken.getUserToken(), bulkToken, copyFiles), (loaded, error) -> {
+ if (error != null) {
+ LOG.error("Encountered unrecoverable error from region server", error);
+ if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
+ numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
+ LOG.warn("Will attempt to retry loading failed HFiles. Retry #" +
+ numRetries.incrementAndGet());
+ // return lqi's to retry
+ future.complete(lqis);
+ } else {
+ LOG.error(RETRY_ON_IO_EXCEPTION +
+ " is disabled or we have reached retry limit. Unable to recover");
+ future.completeExceptionally(error);
+ }
+ } else {
+ if (loaded) {
+ future.complete(Collections.emptyList());
+ } else {
+ LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) +
+ " into table " + tableName + " with files " + lqis +
+ " failed. This is recoverable and they will be retried.");
+ // return lqi's to retry
+ future.complete(lqis);
+ }
+ }
+ });
+ loadingFutures.add(future);
+ if (item2RegionMap != null) {
+ for (LoadQueueItem lqi : lqis) {
+ item2RegionMap.put(lqi, entry.getKey());
+ }
+ }
+ }
+
+ // get all the results.
+ for (Future<Collection<LoadQueueItem>> future : loadingFutures) {
+ try {
+ Collection<LoadQueueItem> toRetry = future.get();
+
+ for (LoadQueueItem lqi : toRetry) {
+ item2RegionMap.remove(lqi);
+ }
+ // LQIs that are requeued to be regrouped.
+ queue.addAll(toRetry);
+ } catch (ExecutionException e1) {
+ Throwable t = e1.getCause();
+ if (t instanceof IOException) {
+ // At this point something unrecoverable has happened.
+ // TODO Implement bulk load recovery
+ throw new IOException("BulkLoad encountered an unrecoverable problem", t);
+ }
+ LOG.error("Unexpected execution exception during bulk load", e1);
+ throw new IllegalStateException(t);
+ } catch (InterruptedException e1) {
+ LOG.error("Unexpected interrupted exception during bulk load", e1);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
+ }
+ }
+ }
+
+ private boolean checkHFilesCountPerRegionPerFamily(
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
+ for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
+ Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ for (LoadQueueItem lqi : e.getValue()) {
+ MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
+ count.increment();
+ if (count.intValue() > maxFilesPerRegionPerFamily) {
+ LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily +
+ " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) +
+ " of region with start key " + Bytes.toStringBinary(e.getKey()));
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ /**
+ * @param table the table to load into
+ * @param pool the ExecutorService
+ * @param queue the queue for LoadQueueItem
+ * @param startEndKeys start and end keys
+ * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
+ */
+ private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
+ AsyncClusterConnection conn, TableName tableName, ExecutorService pool,
+ Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
+ // <region start key, LQI> need synchronized only within this scope of this
+ // phase because of the puts that happen in futures.
+ Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
+ final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
+ Set<String> missingHFiles = new HashSet<>();
+ Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
+ new Pair<>(regionGroups, missingHFiles);
+
+ // drain LQIs and figure out bulk load groups
+ Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
+ while (!queue.isEmpty()) {
+ final LoadQueueItem item = queue.remove();
+
+ final Callable<Pair<List<LoadQueueItem>, String>> call =
+ new Callable<Pair<List<LoadQueueItem>, String>>() {
+ @Override
+ public Pair<List<LoadQueueItem>, String> call() throws Exception {
+ Pair<List<LoadQueueItem>, String> splits =
+ groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
+ return splits;
+ }
+ };
+ splittingFutures.add(pool.submit(call));
+ }
+ // get all the results. All grouping and splitting must finish before
+ // we can attempt the atomic loads.
+ for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
+ try {
+ Pair<List<LoadQueueItem>, String> splits = lqis.get();
+ if (splits != null) {
+ if (splits.getFirst() != null) {
+ queue.addAll(splits.getFirst());
+ } else {
+ missingHFiles.add(splits.getSecond());
+ }
+ }
+ } catch (ExecutionException e1) {
+ Throwable t = e1.getCause();
+ if (t instanceof IOException) {
+ LOG.error("IOException during splitting", e1);
+ throw (IOException) t; // would have been thrown if not parallelized,
+ }
+ LOG.error("Unexpected execution exception during splitting", e1);
+ throw new IllegalStateException(t);
+ } catch (InterruptedException e1) {
+ LOG.error("Unexpected interrupted exception during splitting", e1);
+ throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
+ }
+ }
+ return pair;
+ }
+
+ // unique file name for the table
+ private String getUniqueName() {
+ return UUID.randomUUID().toString().replaceAll("-", "");
+ }
+
+ private List<LoadQueueItem> splitStoreFile(LoadQueueItem item, TableDescriptor tableDesc,
+ byte[] startKey, byte[] splitKey) throws IOException {
+ Path hfilePath = item.getFilePath();
+ byte[] family = item.getFamily();
+ Path tmpDir = hfilePath.getParent();
+ if (!tmpDir.getName().equals(TMP_DIR)) {
+ tmpDir = new Path(tmpDir, TMP_DIR);
+ }
+
+ LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
+
+ String uniqueName = getUniqueName();
+ ColumnFamilyDescriptor familyDesc = tableDesc.getColumnFamily(family);
+
+ Path botOut = new Path(tmpDir, uniqueName + ".bottom");
+ Path topOut = new Path(tmpDir, uniqueName + ".top");
+ splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
+
+ FileSystem fs = tmpDir.getFileSystem(getConf());
+ fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
+ fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
+
+ // Add these back at the *front* of the queue, so there's a lower
+ // chance that the region will just split again before we get there.
+ List<LoadQueueItem> lqis = new ArrayList<>(2);
+ lqis.add(new LoadQueueItem(family, botOut));
+ lqis.add(new LoadQueueItem(family, topOut));
+
+ // If the current item is already the result of previous splits,
+ // we don't need it anymore. Clean up to save space.
+ // It is not part of the original input files.
+ try {
+ if (tmpDir.getName().equals(TMP_DIR)) {
+ fs.delete(hfilePath, false);
+ }
+ } catch (IOException e) {
+ LOG.warn("Unable to delete temporary split file " + hfilePath);
+ }
+ LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
+ return lqis;
+ }
+
+ /**
+ * Attempt to assign the given load queue item into its target region group. If the hfile boundary
+ * no longer fits into a region, physically splits the hfile such that the new bottom half will
+ * fit and returns the list of LQI's corresponding to the resultant hfiles.
+ * <p/>
+ * protected for testing
+ * @throws IOException if an IO failure is encountered
+ */
+ @VisibleForTesting
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
+ TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
+ List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
+ Path hfilePath = item.getFilePath();
+ Optional<byte[]> first, last;
+ try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
+ CacheConfig.DISABLED, true, getConf())) {
+ hfr.loadFileInfo();
+ first = hfr.getFirstRowKey();
+ last = hfr.getLastRowKey();
+ } catch (FileNotFoundException fnfe) {
+ LOG.debug("encountered", fnfe);
+ return new Pair<>(null, hfilePath.getName());
+ }
+
+ LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) +
+ " last=" + last.map(Bytes::toStringBinary));
+ if (!first.isPresent() || !last.isPresent()) {
+ assert !first.isPresent() && !last.isPresent();
+ // TODO what if this is due to a bad HFile?
+ LOG.info("hfile " + hfilePath + " has no entries, skipping");
+ return null;
+ }
+ if (Bytes.compareTo(first.get(), last.get()) > 0) {
+ throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) +
+ " > " + Bytes.toStringBinary(last.get()));
+ }
+ int idx =
+ Collections.binarySearch(startEndKeys, Pair.newPair(first.get(), HConstants.EMPTY_END_ROW),
+ (p1, p2) -> Bytes.compareTo(p1.getFirst(), p2.getFirst()));
+ if (idx < 0) {
+ // not on boundary, returns -(insertion index). Calculate region it
+ // would be in.
+ idx = -(idx + 1) - 1;
+ }
+ int indexForCallable = idx;
+
+ /**
+ * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
+ * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
+ * region. 3) if the endkey of the last region is not empty.
+ */
+ if (indexForCallable < 0) {
+ throw new IOException("The first region info for table " + tableName +
+ " can't be found in hbase:meta.Please use hbck tool to fix it first.");
+ } else if ((indexForCallable == startEndKeys.size() - 1) &&
+ !Bytes.equals(startEndKeys.get(indexForCallable).getSecond(), HConstants.EMPTY_BYTE_ARRAY)) {
+ throw new IOException("The last region info for table " + tableName +
+ " can't be found in hbase:meta.Please use hbck tool to fix it first.");
+ } else if (indexForCallable + 1 < startEndKeys.size() &&
+ !(Bytes.compareTo(startEndKeys.get(indexForCallable).getSecond(),
+ startEndKeys.get(indexForCallable + 1).getFirst()) == 0)) {
+ throw new IOException("The endkey of one region for table " + tableName +
+ " is not equal to the startkey of the next region in hbase:meta." +
+ "Please use hbck tool to fix it first.");
+ }
+
+ boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.get(idx).getSecond()) < 0 ||
+ Bytes.equals(startEndKeys.get(idx).getSecond(), HConstants.EMPTY_BYTE_ARRAY);
+ if (!lastKeyInRange) {
+ Pair<byte[], byte[]> startEndKey = startEndKeys.get(indexForCallable);
+ List<LoadQueueItem> lqis =
+ splitStoreFile(item, FutureUtils.get(conn.getAdmin().getDescriptor(tableName)),
+ startEndKey.getFirst(), startEndKey.getSecond());
+ return new Pair<>(lqis, null);
+ }
+
+ // group regions.
+ regionGroups.put(ByteBuffer.wrap(startEndKeys.get(idx).getFirst()), item);
+ return null;
+ }
+
+ /**
+ * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
+ * filters, etc.
+ */
+ @VisibleForTesting
+ static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
+ byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
+ // Open reader with no block cache, and not in-memory
+ Reference topReference = Reference.createTopReference(splitKey);
+ Reference bottomReference = Reference.createBottomReference(splitKey);
+
+ copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
+ copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
+ }
+
+ /**
+ * Copy half of an HFile into a new HFile.
+ */
+ private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
+ Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
+ FileSystem fs = inFile.getFileSystem(conf);
+ CacheConfig cacheConf = CacheConfig.DISABLED;
+ HalfStoreFileReader halfReader = null;
+ StoreFileWriter halfWriter = null;
+ try {
+ halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
+ new AtomicInteger(0), true, conf);
+ Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
+
+ int blocksize = familyDescriptor.getBlocksize();
+ Algorithm compression = familyDescriptor.getCompressionType();
+ BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
+ HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
+ .withChecksumType(HStore.getChecksumType(conf))
+ .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize)
+ .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
+ .build();
+ halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
+ .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
+ HFileScanner scanner = halfReader.getScanner(false, false, false);
+ scanner.seekTo();
+ do {
+ halfWriter.append(scanner.getCell());
+ } while (scanner.next());
+
+ for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
+ if (shouldCopyHFileMetaKey(entry.getKey())) {
+ halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
+ }
+ }
+ } finally {
+ if (halfReader != null) {
+ try {
+ halfReader.close(cacheConf.shouldEvictOnClose());
+ } catch (IOException e) {
+ LOG.warn("failed to close hfile reader for " + inFile, e);
+ }
+ }
+ if (halfWriter != null) {
+ halfWriter.close();
+ }
+ }
+ }
+
+ /**
+ * Infers region boundaries for a new table.
+ * <p/>
+ * Parameter: <br/>
+ * bdryMap is a map between keys to an integer belonging to {+1, -1}
+ * <ul>
+ * <li>If a key is a start key of a file, then it maps to +1</li>
+ * <li>If a key is an end key of a file, then it maps to -1</li>
+ * </ul>
+ * <p>
+ * Algo:<br/>
+ * <ol>
+ * <li>Poll on the keys in order:
+ * <ol type="a">
+ * <li>Keep adding the mapped values to these keys (runningSum)</li>
+ * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
+ * boundary list.</li>
+ * </ol>
+ * </li>
+ * <li>Return the boundary list.</li>
+ * </ol>
+ */
+ public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
+ List<byte[]> keysArray = new ArrayList<>();
+ int runningValue = 0;
+ byte[] currStartKey = null;
+ boolean firstBoundary = true;
+
+ for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
+ if (runningValue == 0) {
+ currStartKey = item.getKey();
+ }
+ runningValue += item.getValue();
+ if (runningValue == 0) {
+ if (!firstBoundary) {
+ keysArray.add(currStartKey);
+ }
+ firstBoundary = false;
+ }
+ }
+
+ return keysArray.toArray(new byte[0][]);
+ }
+
+ /**
+ * If the table is created for the first time, then "completebulkload" reads the files twice. More
+ * modifications necessary if we want to avoid doing it.
+ */
+ private void createTable(TableName tableName, Path hfofDir, AsyncAdmin admin) throws IOException {
+ final FileSystem fs = hfofDir.getFileSystem(getConf());
+
+ // Add column families
+ // Build a set of keys
+ List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
+ SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
+ @Override
+ public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
+ ColumnFamilyDescriptorBuilder builder =
+ ColumnFamilyDescriptorBuilder.newBuilder(familyName);
+ familyBuilders.add(builder);
+ return builder;
+ }
+
+ @Override
+ public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
+ throws IOException {
+ Path hfile = hfileStatus.getPath();
+ try (HFile.Reader reader =
+ HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
+ if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
+ builder.setCompressionType(reader.getFileContext().getCompression());
+ LOG.info("Setting compression " + reader.getFileContext().getCompression().name() +
+ " for family " + builder.getNameAsString());
+ }
+ reader.loadFileInfo();
+ byte[] first = reader.getFirstRowKey().get();
+ byte[] last = reader.getLastRowKey().get();
+
+ LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
+ Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
+
+ // To eventually infer start key-end key boundaries
+ Integer value = map.containsKey(first) ? map.get(first) : 0;
+ map.put(first, value + 1);
+
+ value = map.containsKey(last) ? map.get(last) : 0;
+ map.put(last, value - 1);
+ }
+ }
+ }, true);
+
+ byte[][] keys = inferBoundaries(map);
+ TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
+ familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
+ .forEachOrdered(tdBuilder::setColumnFamily);
+ FutureUtils.get(admin.createTable(tdBuilder.build(), keys));
+
+ LOG.info("Table " + tableName + " is available!!");
+ }
+
+ private Map<LoadQueueItem, ByteBuffer> performBulkLoad(AsyncClusterConnection conn,
+ TableName tableName, Deque<LoadQueueItem> queue, ExecutorService pool, boolean copyFile)
+ throws IOException {
+ int count = 0;
+
+ fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
+ bulkToken = FutureUtils.get(conn.prepareBulkLoad(tableName));
+ Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
+
+ Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
+ // Assumes that region splits can happen while this occurs.
+ while (!queue.isEmpty()) {
+ // need to reload split keys each iteration.
+ final List<Pair<byte[], byte[]>> startEndKeys =
+ FutureUtils.get(conn.getRegionLocator(tableName).getStartEndKeys());
+ if (count != 0) {
+ LOG.info("Split occurred while grouping HFiles, retry attempt " + +count + " with " +
+ queue.size() + " files remaining to group or split");
+ }
+
+ int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
+ maxRetries = Math.max(maxRetries, startEndKeys.size() + 1);
+ if (maxRetries != 0 && count >= maxRetries) {
+ throw new IOException(
+ "Retry attempted " + count + " times without completing, bailing out");
+ }
+ count++;
+
+ // Using ByteBuffer for byte[] equality semantics
+ pair = groupOrSplitPhase(conn, tableName, pool, queue, startEndKeys);
+ Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
+
+ if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
+ // Error is logged inside checkHFilesCountPerRegionPerFamily.
+ throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily +
+ " hfiles to one family of one region");
+ }
+
+ bulkLoadPhase(conn, tableName, queue, regionGroups, copyFile, item2RegionMap);
+
+ // NOTE: The next iteration's split / group could happen in parallel to
+ // atomic bulkloads assuming that there are splits and no merges, and
+ // that we can atomically pull out the groups we want to retry.
+ }
+
+ if (!queue.isEmpty()) {
+ throw new RuntimeException(
+ "Bulk load aborted with some files not yet loaded." + "Please check log for more details.");
+ }
+ return item2RegionMap;
+ }
+
+ private void cleanup(AsyncClusterConnection conn, TableName tableName, Deque<LoadQueueItem> queue,
+ ExecutorService pool) throws IOException {
+ fsDelegationToken.releaseDelegationToken();
+ if (bulkToken != null) {
+ conn.cleanupBulkLoad(tableName, bulkToken);
+ }
+ if (pool != null) {
+ pool.shutdown();
+ }
+ if (!queue.isEmpty()) {
+ StringBuilder err = new StringBuilder();
+ err.append("-------------------------------------------------\n");
+ err.append("Bulk load aborted with some files not yet loaded:\n");
+ err.append("-------------------------------------------------\n");
+ for (LoadQueueItem q : queue) {
+ err.append(" ").append(q.getFilePath()).append('\n');
+ }
+ LOG.error(err.toString());
+ }
}
- private Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> convert(
- Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> map) {
- return map.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+ /**
+ * Perform a bulk load of the given directory into the given pre-existing table. This method is
+ * not threadsafe.
+ * @param map map of family to List of hfiles
+ * @param tableName table to load the hfiles
+ * @param silence true to ignore unmatched column families
+ * @param copyFile always copy hfiles if true
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
+ TableName tableName, Map<byte[], List<Path>> map, boolean silence, boolean copyFile)
+ throws TableNotFoundException, IOException {
+ if (!FutureUtils.get(conn.getAdmin().isTableAvailable(tableName))) {
+ throw new TableNotFoundException("Table " + tableName + " is not currently available.");
+ }
+ // LQI queue does not need to be threadsafe -- all operations on this queue
+ // happen in this thread
+ Deque<LoadQueueItem> queue = new ArrayDeque<>();
+ ExecutorService pool = null;
+ try {
+ prepareHFileQueue(conn, tableName, map, queue, silence);
+ if (queue.isEmpty()) {
+ LOG.warn("Bulk load operation did not get any files to load");
+ return Collections.emptyMap();
+ }
+ pool = createExecutorService();
+ return performBulkLoad(conn, tableName, queue, pool, copyFile);
+ } finally {
+ cleanup(conn, tableName, queue, pool);
+ }
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given pre-existing table. This method is
+ * not threadsafe.
+ * @param tableName table to load the hfiles
+ * @param hfofDir the directory that was provided as the output path of a job using
+ * HFileOutputFormat
+ * @param silence true to ignore unmatched column families
+ * @param copyFile always copy hfiles if true
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ private Map<LoadQueueItem, ByteBuffer> doBulkLoad(AsyncClusterConnection conn,
+ TableName tableName, Path hfofDir, boolean silence, boolean copyFile)
+ throws TableNotFoundException, IOException {
+ if (!FutureUtils.get(conn.getAdmin().isTableAvailable(tableName))) {
+ throw new TableNotFoundException("Table " + tableName + " is not currently available.");
+ }
+
+ /*
+ * Checking hfile format is a time-consuming operation, we should have an option to skip this
+ * step when bulkloading millions of HFiles. See HBASE-13985.
+ */
+ boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
+ if (!validateHFile) {
+ LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
+ "are not correct. If you fail to read data from your table after using this " +
+ "option, consider removing the files and bulkload again without this option. " +
+ "See HBASE-13985");
+ }
+ // LQI queue does not need to be threadsafe -- all operations on this queue
+ // happen in this thread
+ Deque<LoadQueueItem> queue = new ArrayDeque<>();
+ ExecutorService pool = null;
+ try {
+ prepareHFileQueue(getConf(), conn, tableName, hfofDir, queue, validateHFile, silence);
+
+ if (queue.isEmpty()) {
+ LOG.warn(
+ "Bulk load operation did not find any files to load in directory {}. " +
+ "Does it contain files in subdirectories that correspond to column family names?",
+ (hfofDir != null ? hfofDir.toUri().toString() : ""));
+ return Collections.emptyMap();
+ }
+ pool = createExecutorService();
+ return performBulkLoad(conn, tableName, queue, pool, copyFile);
+ } finally {
+ cleanup(conn, tableName, queue, pool);
+ }
}
@Override
- public Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
+ public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
Map<byte[], List<Path>> family2Files) throws TableNotFoundException, IOException {
- return convert(run(family2Files, tableName));
+ try (AsyncClusterConnection conn = ClusterConnectionFactory
+ .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
+ if (!FutureUtils.get(conn.getAdmin().tableExists(tableName))) {
+ String errorMsg = format("Table '%s' does not exist.", tableName);
+ LOG.error(errorMsg);
+ throw new TableNotFoundException(errorMsg);
+ }
+ return doBulkLoad(conn, tableName, family2Files, isSilence(), isAlwaysCopyFiles());
+ }
}
@Override
- public Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
+ public Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
throws TableNotFoundException, IOException {
- return convert(run(dir, tableName));
+ try (AsyncClusterConnection conn = ClusterConnectionFactory
+ .createAsyncClusterConnection(getConf(), null, userProvider.getCurrent())) {
+ AsyncAdmin admin = conn.getAdmin();
+ if (!FutureUtils.get(admin.tableExists(tableName))) {
+ if (isCreateTable()) {
+ createTable(tableName, dir, admin);
+ } else {
+ String errorMsg = format("Table '%s' does not exist.", tableName);
+ LOG.error(errorMsg);
+ throw new TableNotFoundException(errorMsg);
+ }
+ }
+ return doBulkLoad(conn, tableName, dir, isSilence(), isAlwaysCopyFiles());
+ }
+ }
+
+ public void setBulkToken(String bulkToken) {
+ this.bulkToken = bulkToken;
+ }
+
+ private void usage() {
+ System.err
+ .println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable" +
+ "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by " +
+ "this tool\n Note: if you set this to 'no', then the target table must already exist " +
+ "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you" +
+ " must have an existing table\n-D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used " +
+ "to ignore unmatched column families\n" + "\n");
+ }
+
+ @Override
+ public int run(String[] args) throws Exception {
+ if (args.length != 2 && args.length != 3) {
+ usage();
+ return -1;
+ }
+ Path dirPath = new Path(args[0]);
+ TableName tableName = TableName.valueOf(args[1]);
+
+ if (args.length == 2) {
+ return !bulkLoad(tableName, dirPath).isEmpty() ? 0 : -1;
+ } else {
+ Map<byte[], List<Path>> family2Files = Maps.newHashMap();
+ FileSystem fs = FileSystem.get(getConf());
+ for (FileStatus regionDir : fs.listStatus(dirPath)) {
+ FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
+ Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
+ byte[] familyName = Bytes.toBytes(family);
+ if (family2Files.containsKey(familyName)) {
+ family2Files.get(familyName).add(path);
+ } else {
+ family2Files.put(familyName, Lists.newArrayList(path));
+ }
+ });
+ }
+ return !bulkLoad(tableName, family2Files).isEmpty() ? 0 : -1;
+ }
}
public static void main(String[] args) throws Exception {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
deleted file mode 100644
index ec349fe..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ /dev/null
@@ -1,1282 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.tool;
-
-import static java.lang.String.format;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Optional;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.UUID;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.Collectors;
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.conf.Configured;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
-import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.io.HFileLink;
-import org.apache.hadoop.hbase.io.HalfStoreFileReader;
-import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
-import org.apache.hadoop.hbase.io.hfile.CacheConfig;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
-import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.security.token.FsDelegationToken;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSHDFSUtils;
-import org.apache.hadoop.hbase.util.FSVisitor;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-/**
- * Tool to load the output of HFileOutputFormat into an existing table.
- * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please
- * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)}
- * and {@link #run(String, TableName)}, as all the methods other than them will be
- * removed with no replacement.
- */
-@Deprecated
-@InterfaceAudience.Public
-public class LoadIncrementalHFiles extends Configured implements Tool {
-
- private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class);
-
- /**
- * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not
- * depend on this value.
- */
- @Deprecated
- public static final String NAME = BulkLoadHFilesTool.NAME;
- static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION;
- public static final String MAX_FILES_PER_REGION_PER_FAMILY =
- BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY;
- private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS;
- public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY;
- public final static String IGNORE_UNMATCHED_CF_CONF_KEY =
- BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY;
- public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES;
-
- // We use a '.' prefix which is ignored when walking directory trees
- // above. It is invalid family name.
- static final String TMP_DIR = ".tmp";
-
- private final int maxFilesPerRegionPerFamily;
- private final boolean assignSeqIds;
-
- // Source delegation token
- private final FsDelegationToken fsDelegationToken;
- private final UserProvider userProvider;
- private final int nrThreads;
- private AtomicInteger numRetries;
- private final RpcControllerFactory rpcControllerFactory;
-
- private String bulkToken;
-
- /**
- * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
- * the case where a region has split during the process of the load. When this happens, the HFile
- * is split into two physical parts across the new region boundary, and each part is added back
- * into the queue. The import process finishes when the queue is empty.
- * @deprecated Use {@link BulkLoadHFiles} instead.
- */
- @InterfaceAudience.Public
- @Deprecated
- public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem {
-
- public LoadQueueItem(byte[] family, Path hfilePath) {
- super(family, hfilePath);
- }
- }
-
- public LoadIncrementalHFiles(Configuration conf) {
- // make a copy, just to be sure we're not overriding someone else's config
- super(HBaseConfiguration.create(conf));
- conf = getConf();
- // disable blockcache for tool invocation, see HBASE-10500
- conf.setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0);
- userProvider = UserProvider.instantiate(conf);
- fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
- assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
- maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
- nrThreads = conf.getInt("hbase.loadincremental.threads.max",
- Runtime.getRuntime().availableProcessors());
- numRetries = new AtomicInteger(0);
- rpcControllerFactory = new RpcControllerFactory(conf);
- }
-
- private void usage() {
- System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable"
- + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by "
- + "this tool\n Note: if you set this to 'no', then the target table must already exist "
- + "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you"
- + " must have an existing table\n-D" + IGNORE_UNMATCHED_CF_CONF_KEY + "=yes - can be used "
- + "to ignore unmatched column families\n" +
- "\n");
- }
-
- /**
- * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
- * passed directory and validates whether the prepared queue has all the valid table column
- * families in it.
- * @param hfilesDir directory containing list of hfiles to be loaded into the table
- * @param table table to which hfiles should be loaded
- * @param queue queue which needs to be loaded into the table
- * @param validateHFile if true hfiles will be validated for its format
- * @throws IOException If any I/O or network error occurred
- */
- public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
- boolean validateHFile) throws IOException {
- prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
- }
-
- /**
- * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
- * passed directory and validates whether the prepared queue has all the valid table column
- * families in it.
- * @param hfilesDir directory containing list of hfiles to be loaded into the table
- * @param table table to which hfiles should be loaded
- * @param queue queue which needs to be loaded into the table
- * @param validateHFile if true hfiles will be validated for its format
- * @param silence true to ignore unmatched column families
- * @throws IOException If any I/O or network error occurred
- */
- public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
- boolean validateHFile, boolean silence) throws IOException {
- discoverLoadQueue(queue, hfilesDir, validateHFile);
- validateFamiliesInHFiles(table, queue, silence);
- }
-
- /**
- * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
- * passed directory and validates whether the prepared queue has all the valid table column
- * families in it.
- * @param map map of family to List of hfiles
- * @param table table to which hfiles should be loaded
- * @param queue queue which needs to be loaded into the table
- * @param silence true to ignore unmatched column families
- * @throws IOException If any I/O or network error occurred
- */
- public void prepareHFileQueue(Map<byte[], List<Path>> map, Table table,
- Deque<LoadQueueItem> queue, boolean silence) throws IOException {
- populateLoadQueue(queue, map);
- validateFamiliesInHFiles(table, queue, silence);
- }
-
- /**
- * Perform a bulk load of the given directory into the given pre-existing table. This method is
- * not threadsafe.
- * @param hfofDir the directory that was provided as the output path of a job using
- * HFileOutputFormat
- * @param admin the Admin
- * @param table the table to load into
- * @param regionLocator region locator
- * @throws TableNotFoundException if table does not yet exist
- */
- public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
- RegionLocator regionLocator) throws TableNotFoundException, IOException {
- return doBulkLoad(hfofDir, admin, table, regionLocator, false, false);
- }
-
- /**
- * Perform a bulk load of the given directory into the given pre-existing table. This method is
- * not threadsafe.
- * @param map map of family to List of hfiles
- * @param admin the Admin
- * @param table the table to load into
- * @param regionLocator region locator
- * @param silence true to ignore unmatched column families
- * @param copyFile always copy hfiles if true
- * @throws TableNotFoundException if table does not yet exist
- */
- public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Map<byte[], List<Path>> map, final Admin admin,
- Table table, RegionLocator regionLocator, boolean silence, boolean copyFile)
- throws TableNotFoundException, IOException {
- if (!admin.isTableAvailable(regionLocator.getName())) {
- throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
- }
- // LQI queue does not need to be threadsafe -- all operations on this queue
- // happen in this thread
- Deque<LoadQueueItem> queue = new ArrayDeque<>();
- ExecutorService pool = null;
- SecureBulkLoadClient secureClient = null;
- try {
- prepareHFileQueue(map, table, queue, silence);
- if (queue.isEmpty()) {
- LOG.warn("Bulk load operation did not get any files to load");
- return Collections.emptyMap();
- }
- pool = createExecutorService();
- secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
- return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
- } finally {
- cleanup(admin, queue, pool, secureClient);
- }
- }
-
- /**
- * Perform a bulk load of the given directory into the given pre-existing table. This method is
- * not threadsafe.
- * @param hfofDir the directory that was provided as the output path of a job using
- * HFileOutputFormat
- * @param admin the Admin
- * @param table the table to load into
- * @param regionLocator region locator
- * @param silence true to ignore unmatched column families
- * @param copyFile always copy hfiles if true
- * @throws TableNotFoundException if table does not yet exist
- */
- public Map<LoadQueueItem, ByteBuffer> doBulkLoad(Path hfofDir, final Admin admin, Table table,
- RegionLocator regionLocator, boolean silence, boolean copyFile)
- throws TableNotFoundException, IOException {
- if (!admin.isTableAvailable(regionLocator.getName())) {
- throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
- }
-
- /*
- * Checking hfile format is a time-consuming operation, we should have an option to skip this
- * step when bulkloading millions of HFiles. See HBASE-13985.
- */
- boolean validateHFile = getConf().getBoolean("hbase.loadincremental.validate.hfile", true);
- if (!validateHFile) {
- LOG.warn("You are skipping HFiles validation, it might cause some data loss if files " +
- "are not correct. If you fail to read data from your table after using this " +
- "option, consider removing the files and bulkload again without this option. " +
- "See HBASE-13985");
- }
- // LQI queue does not need to be threadsafe -- all operations on this queue
- // happen in this thread
- Deque<LoadQueueItem> queue = new ArrayDeque<>();
- ExecutorService pool = null;
- SecureBulkLoadClient secureClient = null;
- try {
- prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
-
- if (queue.isEmpty()) {
- LOG.warn(
- "Bulk load operation did not find any files to load in directory {}. " +
- "Does it contain files in subdirectories that correspond to column family names?",
- (hfofDir != null ? hfofDir.toUri().toString() : ""));
- return Collections.emptyMap();
- }
- pool = createExecutorService();
- secureClient = new SecureBulkLoadClient(table.getConfiguration(), table);
- return performBulkLoad(admin, table, regionLocator, queue, pool, secureClient, copyFile);
- } finally {
- cleanup(admin, queue, pool, secureClient);
- }
- }
-
- /**
- * Used by the replication sink to load the hfiles from the source cluster. It does the following,
- * <ol>
- * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
- * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
- * </li>
- * </ol>
- * @param table Table to which these hfiles should be loaded to
- * @param conn Connection to use
- * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
- * @param startEndKeys starting and ending row keys of the region
- */
- public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
- Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- loadHFileQueue(table, conn, queue, startEndKeys, false);
- }
-
- /**
- * Used by the replication sink to load the hfiles from the source cluster. It does the following,
- * <ol>
- * <li>LoadIncrementalHFiles#groupOrSplitPhase(Table, ExecutorService, Deque, Pair)}</li>
- * <li>LoadIncrementalHFiles#bulkLoadPhase(Table, Connection, ExecutorService, Deque, Multimap)
- * </li>
- * </ol>
- * @param table Table to which these hfiles should be loaded to
- * @param conn Connection to use
- * @param queue {@link LoadQueueItem} has hfiles yet to be loaded
- * @param startEndKeys starting and ending row keys of the region
- */
- public void loadHFileQueue(Table table, Connection conn, Deque<LoadQueueItem> queue,
- Pair<byte[][], byte[][]> startEndKeys, boolean copyFile) throws IOException {
- ExecutorService pool = null;
- try {
- pool = createExecutorService();
- Multimap<ByteBuffer, LoadQueueItem> regionGroups =
- groupOrSplitPhase(table, pool, queue, startEndKeys).getFirst();
- bulkLoadPhase(table, conn, pool, queue, regionGroups, copyFile, null);
- } finally {
- if (pool != null) {
- pool.shutdown();
- }
- }
- }
-
- private Map<LoadQueueItem, ByteBuffer> performBulkLoad(Admin admin, Table table,
- RegionLocator regionLocator, Deque<LoadQueueItem> queue, ExecutorService pool,
- SecureBulkLoadClient secureClient, boolean copyFile) throws IOException {
- int count = 0;
-
- fsDelegationToken.acquireDelegationToken(queue.peek().getFilePath().getFileSystem(getConf()));
- bulkToken = secureClient.prepareBulkLoad(admin.getConnection());
- Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair = null;
-
- Map<LoadQueueItem, ByteBuffer> item2RegionMap = new HashMap<>();
- // Assumes that region splits can happen while this occurs.
- while (!queue.isEmpty()) {
- // need to reload split keys each iteration.
- final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();
- if (count != 0) {
- LOG.info("Split occurred while grouping HFiles, retry attempt " + +count + " with " +
- queue.size() + " files remaining to group or split");
- }
-
- int maxRetries = getConf().getInt(HConstants.BULKLOAD_MAX_RETRIES_NUMBER, 10);
- maxRetries = Math.max(maxRetries, startEndKeys.getFirst().length + 1);
- if (maxRetries != 0 && count >= maxRetries) {
- throw new IOException(
- "Retry attempted " + count + " times without completing, bailing out");
- }
- count++;
-
- // Using ByteBuffer for byte[] equality semantics
- pair = groupOrSplitPhase(table, pool, queue, startEndKeys);
- Multimap<ByteBuffer, LoadQueueItem> regionGroups = pair.getFirst();
-
- if (!checkHFilesCountPerRegionPerFamily(regionGroups)) {
- // Error is logged inside checkHFilesCountPerRegionPerFamily.
- throw new IOException("Trying to load more than " + maxFilesPerRegionPerFamily +
- " hfiles to one family of one region");
- }
-
- bulkLoadPhase(table, admin.getConnection(), pool, queue, regionGroups, copyFile,
- item2RegionMap);
-
- // NOTE: The next iteration's split / group could happen in parallel to
- // atomic bulkloads assuming that there are splits and no merges, and
- // that we can atomically pull out the groups we want to retry.
- }
-
- if (!queue.isEmpty()) {
- throw new RuntimeException("Bulk load aborted with some files not yet loaded." +
- "Please check log for more details.");
- }
- return item2RegionMap;
- }
-
- /**
- * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
- * re-queued for another pass with the groupOrSplitPhase.
- * <p>
- * protected for testing.
- */
- @VisibleForTesting
- protected void bulkLoadPhase(Table table, Connection conn, ExecutorService pool,
- Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
- boolean copyFile, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
- // atomically bulk load the groups.
- Set<Future<List<LoadQueueItem>>> loadingFutures = new HashSet<>();
- for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> e : regionGroups.asMap()
- .entrySet()) {
- byte[] first = e.getKey().array();
- Collection<LoadQueueItem> lqis = e.getValue();
-
- ClientServiceCallable<byte[]> serviceCallable =
- buildClientServiceCallable(conn, table.getName(), first, lqis, copyFile);
-
- Callable<List<LoadQueueItem>> call = new Callable<List<LoadQueueItem>>() {
- @Override
- public List<LoadQueueItem> call() throws Exception {
- List<LoadQueueItem> toRetry =
- tryAtomicRegionLoad(serviceCallable, table.getName(), first, lqis);
- return toRetry;
- }
- };
- if (item2RegionMap != null) {
- for (LoadQueueItem lqi : lqis) {
- item2RegionMap.put(lqi, e.getKey());
- }
- }
- loadingFutures.add(pool.submit(call));
- }
-
- // get all the results.
- for (Future<List<LoadQueueItem>> future : loadingFutures) {
- try {
- List<LoadQueueItem> toRetry = future.get();
-
- if (item2RegionMap != null) {
- for (LoadQueueItem lqi : toRetry) {
- item2RegionMap.remove(lqi);
- }
- }
- // LQIs that are requeued to be regrouped.
- queue.addAll(toRetry);
-
- } catch (ExecutionException e1) {
- Throwable t = e1.getCause();
- if (t instanceof IOException) {
- // At this point something unrecoverable has happened.
- // TODO Implement bulk load recovery
- throw new IOException("BulkLoad encountered an unrecoverable problem", t);
- }
- LOG.error("Unexpected execution exception during bulk load", e1);
- throw new IllegalStateException(t);
- } catch (InterruptedException e1) {
- LOG.error("Unexpected interrupted exception during bulk load", e1);
- throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
- }
- }
- }
-
- @VisibleForTesting
- protected ClientServiceCallable<byte[]> buildClientServiceCallable(Connection conn,
- TableName tableName, byte[] first, Collection<LoadQueueItem> lqis, boolean copyFile) {
- List<Pair<byte[], String>> famPaths =
- lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
- .collect(Collectors.toList());
- return new ClientServiceCallable<byte[]>(conn, tableName, first,
- rpcControllerFactory.newController(), HConstants.PRIORITY_UNSET) {
- @Override
- protected byte[] rpcCall() throws Exception {
- SecureBulkLoadClient secureClient = null;
- boolean success = false;
- try {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Going to connect to server " + getLocation() + " for row " +
- Bytes.toStringBinary(getRow()) + " with hfile group " +
- LoadIncrementalHFiles.this.toString(famPaths));
- }
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(getConf(), table);
- success = secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- assignSeqIds, fsDelegationToken.getUserToken(), bulkToken, copyFile);
- }
- return success ? regionName : null;
- } finally {
- // Best effort copying of files that might not have been imported
- // from the staging directory back to original location
- // in user directory
- if (secureClient != null && !success) {
- FileSystem targetFs = FileSystem.get(getConf());
- FileSystem sourceFs = lqis.iterator().next().getFilePath().getFileSystem(getConf());
- // Check to see if the source and target filesystems are the same
- // If they are the same filesystem, we will try move the files back
- // because previously we moved them to the staging directory.
- if (FSHDFSUtils.isSameHdfs(getConf(), sourceFs, targetFs)) {
- for (Pair<byte[], String> el : famPaths) {
- Path hfileStagingPath = null;
- Path hfileOrigPath = new Path(el.getSecond());
- try {
- hfileStagingPath = new Path(new Path(bulkToken, Bytes.toString(el.getFirst())),
- hfileOrigPath.getName());
- if (targetFs.rename(hfileStagingPath, hfileOrigPath)) {
- LOG.debug("Moved back file " + hfileOrigPath + " from " + hfileStagingPath);
- } else if (targetFs.exists(hfileStagingPath)) {
- LOG.debug(
- "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath);
- }
- } catch (Exception ex) {
- LOG.debug(
- "Unable to move back file " + hfileOrigPath + " from " + hfileStagingPath, ex);
- }
- }
- }
- }
- }
- }
- };
- }
-
- private boolean checkHFilesCountPerRegionPerFamily(
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
- for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
- Map<byte[], MutableInt> filesMap = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (LoadQueueItem lqi : e.getValue()) {
- MutableInt count = filesMap.computeIfAbsent(lqi.getFamily(), k -> new MutableInt());
- count.increment();
- if (count.intValue() > maxFilesPerRegionPerFamily) {
- LOG.error("Trying to load more than " + maxFilesPerRegionPerFamily +
- " hfiles to family " + Bytes.toStringBinary(lqi.getFamily()) +
- " of region with start key " + Bytes.toStringBinary(e.getKey()));
- return false;
- }
- }
- }
- return true;
- }
-
- /**
- * @param table the table to load into
- * @param pool the ExecutorService
- * @param queue the queue for LoadQueueItem
- * @param startEndKeys start and end keys
- * @return A map that groups LQI by likely bulk load region targets and Set of missing hfiles.
- */
- private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
- final Table table, ExecutorService pool, Deque<LoadQueueItem> queue,
- final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- // <region start key, LQI> need synchronized only within this scope of this
- // phase because of the puts that happen in futures.
- Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
- Set<String> missingHFiles = new HashSet<>();
- Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
- new Pair<>(regionGroups, missingHFiles);
-
- // drain LQIs and figure out bulk load groups
- Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
- while (!queue.isEmpty()) {
- final LoadQueueItem item = queue.remove();
-
- final Callable<Pair<List<LoadQueueItem>, String>> call =
- new Callable<Pair<List<LoadQueueItem>, String>>() {
- @Override
- public Pair<List<LoadQueueItem>, String> call() throws Exception {
- Pair<List<LoadQueueItem>, String> splits =
- groupOrSplit(regionGroups, item, table, startEndKeys);
- return splits;
- }
- };
- splittingFutures.add(pool.submit(call));
- }
- // get all the results. All grouping and splitting must finish before
- // we can attempt the atomic loads.
- for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
- try {
- Pair<List<LoadQueueItem>, String> splits = lqis.get();
- if (splits != null) {
- if (splits.getFirst() != null) {
- queue.addAll(splits.getFirst());
- } else {
- missingHFiles.add(splits.getSecond());
- }
- }
- } catch (ExecutionException e1) {
- Throwable t = e1.getCause();
- if (t instanceof IOException) {
- LOG.error("IOException during splitting", e1);
- throw (IOException) t; // would have been thrown if not parallelized,
- }
- LOG.error("Unexpected execution exception during splitting", e1);
- throw new IllegalStateException(t);
- } catch (InterruptedException e1) {
- LOG.error("Unexpected interrupted exception during splitting", e1);
- throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
- }
- }
- return pair;
- }
-
- private List<LoadQueueItem> splitStoreFile(final LoadQueueItem item, final Table table,
- byte[] startKey, byte[] splitKey) throws IOException {
- Path hfilePath = item.getFilePath();
- byte[] family = item.getFamily();
- Path tmpDir = hfilePath.getParent();
- if (!tmpDir.getName().equals(TMP_DIR)) {
- tmpDir = new Path(tmpDir, TMP_DIR);
- }
-
- LOG.info("HFile at " + hfilePath + " no longer fits inside a single " + "region. Splitting...");
-
- String uniqueName = getUniqueName();
- ColumnFamilyDescriptor familyDesc = table.getDescriptor().getColumnFamily(family);
-
- Path botOut = new Path(tmpDir, uniqueName + ".bottom");
- Path topOut = new Path(tmpDir, uniqueName + ".top");
- splitStoreFile(getConf(), hfilePath, familyDesc, splitKey, botOut, topOut);
-
- FileSystem fs = tmpDir.getFileSystem(getConf());
- fs.setPermission(tmpDir, FsPermission.valueOf("-rwxrwxrwx"));
- fs.setPermission(botOut, FsPermission.valueOf("-rwxrwxrwx"));
- fs.setPermission(topOut, FsPermission.valueOf("-rwxrwxrwx"));
-
- // Add these back at the *front* of the queue, so there's a lower
- // chance that the region will just split again before we get there.
- List<LoadQueueItem> lqis = new ArrayList<>(2);
- lqis.add(new LoadQueueItem(family, botOut));
- lqis.add(new LoadQueueItem(family, topOut));
-
- // If the current item is already the result of previous splits,
- // we don't need it anymore. Clean up to save space.
- // It is not part of the original input files.
- try {
- if (tmpDir.getName().equals(TMP_DIR)) {
- fs.delete(hfilePath, false);
- }
- } catch (IOException e) {
- LOG.warn("Unable to delete temporary split file " + hfilePath);
- }
- LOG.info("Successfully split into new HFiles " + botOut + " and " + topOut);
- return lqis;
- }
-
- /**
- * Attempt to assign the given load queue item into its target region group. If the hfile boundary
- * no longer fits into a region, physically splits the hfile such that the new bottom half will
- * fit and returns the list of LQI's corresponding to the resultant hfiles.
- * <p>
- * protected for testing
- * @throws IOException if an IO failure is encountered
- */
- @VisibleForTesting
- protected Pair<List<LoadQueueItem>, String> groupOrSplit(
- Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item, final Table table,
- final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- Path hfilePath = item.getFilePath();
- Optional<byte[]> first, last;
- try (HFile.Reader hfr = HFile.createReader(hfilePath.getFileSystem(getConf()), hfilePath,
- CacheConfig.DISABLED, true, getConf())) {
- hfr.loadFileInfo();
- first = hfr.getFirstRowKey();
- last = hfr.getLastRowKey();
- } catch (FileNotFoundException fnfe) {
- LOG.debug("encountered", fnfe);
- return new Pair<>(null, hfilePath.getName());
- }
-
- LOG.info("Trying to load hfile=" + hfilePath + " first=" + first.map(Bytes::toStringBinary) +
- " last=" + last.map(Bytes::toStringBinary));
- if (!first.isPresent() || !last.isPresent()) {
- assert !first.isPresent() && !last.isPresent();
- // TODO what if this is due to a bad HFile?
- LOG.info("hfile " + hfilePath + " has no entries, skipping");
- return null;
- }
- if (Bytes.compareTo(first.get(), last.get()) > 0) {
- throw new IllegalArgumentException("Invalid range: " + Bytes.toStringBinary(first.get()) +
- " > " + Bytes.toStringBinary(last.get()));
- }
- int idx = Arrays.binarySearch(startEndKeys.getFirst(), first.get(), Bytes.BYTES_COMPARATOR);
- if (idx < 0) {
- // not on boundary, returns -(insertion index). Calculate region it
- // would be in.
- idx = -(idx + 1) - 1;
- }
- int indexForCallable = idx;
-
- /**
- * we can consider there is a region hole in following conditions. 1) if idx < 0,then first
- * region info is lost. 2) if the endkey of a region is not equal to the startkey of the next
- * region. 3) if the endkey of the last region is not empty.
- */
- if (indexForCallable < 0) {
- throw new IOException("The first region info for table " + table.getName() +
- " can't be found in hbase:meta.Please use hbck tool to fix it first.");
- } else if ((indexForCallable == startEndKeys.getFirst().length - 1) &&
- !Bytes.equals(startEndKeys.getSecond()[indexForCallable], HConstants.EMPTY_BYTE_ARRAY)) {
- throw new IOException("The last region info for table " + table.getName() +
- " can't be found in hbase:meta.Please use hbck tool to fix it first.");
- } else if (indexForCallable + 1 < startEndKeys.getFirst().length &&
- !(Bytes.compareTo(startEndKeys.getSecond()[indexForCallable],
- startEndKeys.getFirst()[indexForCallable + 1]) == 0)) {
- throw new IOException("The endkey of one region for table " + table.getName() +
- " is not equal to the startkey of the next region in hbase:meta." +
- "Please use hbck tool to fix it first.");
- }
-
- boolean lastKeyInRange = Bytes.compareTo(last.get(), startEndKeys.getSecond()[idx]) < 0 ||
- Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
- if (!lastKeyInRange) {
- List<LoadQueueItem> lqis = splitStoreFile(item, table,
- startEndKeys.getFirst()[indexForCallable], startEndKeys.getSecond()[indexForCallable]);
- return new Pair<>(lqis, null);
- }
-
- // group regions.
- regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
- return null;
- }
-
- /**
- * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
- * hfiles that need to be retried. If it is successful it will return an empty list.
- * <p>
- * NOTE: To maintain row atomicity guarantees, region server callable should succeed atomically
- * and fails atomically.
- * <p>
- * Protected for testing.
- * @return empty list if success, list of items to retry on recoverable failure
- */
- @VisibleForTesting
- protected List<LoadQueueItem> tryAtomicRegionLoad(ClientServiceCallable<byte[]> serviceCallable,
- final TableName tableName, final byte[] first, final Collection<LoadQueueItem> lqis)
- throws IOException {
- List<LoadQueueItem> toRetry = new ArrayList<>();
- try {
- Configuration conf = getConf();
- byte[] region = RpcRetryingCallerFactory.instantiate(conf, null).<byte[]> newCaller()
- .callWithRetries(serviceCallable, Integer.MAX_VALUE);
- if (region == null) {
- LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) +
- " into table " + tableName + " with files " + lqis +
- " failed. This is recoverable and they will be retried.");
- toRetry.addAll(lqis); // return lqi's to retry
- }
- // success
- return toRetry;
- } catch (IOException e) {
- LOG.error("Encountered unrecoverable error from region server, additional details: " +
- serviceCallable.getExceptionMessageAdditionalDetail(),
- e);
- LOG.warn(
- "Received a " + e.getClass().getSimpleName()
- + " from region server: "
- + serviceCallable.getExceptionMessageAdditionalDetail(), e);
- if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
- && numRetries.get() < getConf().getInt(
- HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
- LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
- + numRetries.incrementAndGet());
- toRetry.addAll(lqis);
- return toRetry;
- }
- LOG.error(RETRY_ON_IO_EXCEPTION + " is disabled. Unable to recover");
- throw e;
- }
- }
-
- /**
- * If the table is created for the first time, then "completebulkload" reads the files twice. More
- * modifications necessary if we want to avoid doing it.
- */
- private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException {
- final FileSystem fs = hfofDir.getFileSystem(getConf());
-
- // Add column families
- // Build a set of keys
- List<ColumnFamilyDescriptorBuilder> familyBuilders = new ArrayList<>();
- SortedMap<byte[], Integer> map = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<ColumnFamilyDescriptorBuilder>() {
- @Override
- public ColumnFamilyDescriptorBuilder bulkFamily(byte[] familyName) {
- ColumnFamilyDescriptorBuilder builder =
- ColumnFamilyDescriptorBuilder.newBuilder(familyName);
- familyBuilders.add(builder);
- return builder;
- }
-
- @Override
- public void bulkHFile(ColumnFamilyDescriptorBuilder builder, FileStatus hfileStatus)
- throws IOException {
- Path hfile = hfileStatus.getPath();
- try (HFile.Reader reader =
- HFile.createReader(fs, hfile, CacheConfig.DISABLED, true, getConf())) {
- if (builder.getCompressionType() != reader.getFileContext().getCompression()) {
- builder.setCompressionType(reader.getFileContext().getCompression());
- LOG.info("Setting compression " + reader.getFileContext().getCompression().name() +
- " for family " + builder.getNameAsString());
- }
- reader.loadFileInfo();
- byte[] first = reader.getFirstRowKey().get();
- byte[] last = reader.getLastRowKey().get();
-
- LOG.info("Trying to figure out region boundaries hfile=" + hfile + " first=" +
- Bytes.toStringBinary(first) + " last=" + Bytes.toStringBinary(last));
-
- // To eventually infer start key-end key boundaries
- Integer value = map.containsKey(first) ? map.get(first) : 0;
- map.put(first, value + 1);
-
- value = map.containsKey(last) ? map.get(last) : 0;
- map.put(last, value - 1);
- }
- }
- });
-
- byte[][] keys = inferBoundaries(map);
- TableDescriptorBuilder tdBuilder = TableDescriptorBuilder.newBuilder(tableName);
- familyBuilders.stream().map(ColumnFamilyDescriptorBuilder::build)
- .forEachOrdered(tdBuilder::setColumnFamily);
- admin.createTable(tdBuilder.build(), keys);
-
- LOG.info("Table " + tableName + " is available!!");
- }
-
- private void cleanup(Admin admin, Deque<LoadQueueItem> queue, ExecutorService pool,
- SecureBulkLoadClient secureClient) throws IOException {
- fsDelegationToken.releaseDelegationToken();
- if (bulkToken != null && secureClient != null) {
- secureClient.cleanupBulkLoad(admin.getConnection(), bulkToken);
- }
- if (pool != null) {
- pool.shutdown();
- }
- if (!queue.isEmpty()) {
- StringBuilder err = new StringBuilder();
- err.append("-------------------------------------------------\n");
- err.append("Bulk load aborted with some files not yet loaded:\n");
- err.append("-------------------------------------------------\n");
- for (LoadQueueItem q : queue) {
- err.append(" ").append(q.getFilePath()).append('\n');
- }
- LOG.error(err.toString());
- }
- }
-
- // unique file name for the table
- private String getUniqueName() {
- return UUID.randomUUID().toString().replaceAll("-", "");
- }
-
- /**
- * Checks whether there is any invalid family name in HFiles to be bulk loaded.
- */
- private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
- throws IOException {
- Set<String> familyNames = Arrays.asList(table.getDescriptor().getColumnFamilies()).stream()
- .map(f -> f.getNameAsString()).collect(Collectors.toSet());
- List<String> unmatchedFamilies = queue.stream().map(item -> Bytes.toString(item.getFamily()))
- .filter(fn -> !familyNames.contains(fn)).distinct().collect(Collectors.toList());
- if (unmatchedFamilies.size() > 0) {
- String msg =
- "Unmatched family names found: unmatched family names in HFiles to be bulkloaded: " +
- unmatchedFamilies + "; valid family names of table " + table.getName() + " are: " +
- familyNames;
- LOG.error(msg);
- if (!silence) {
- throw new IOException(msg);
- }
- }
- }
-
- /**
- * Populate the Queue with given HFiles
- */
- private void populateLoadQueue(Deque<LoadQueueItem> ret, Map<byte[], List<Path>> map) {
- map.forEach((k, v) -> v.stream().map(p -> new LoadQueueItem(k, p)).forEachOrdered(ret::add));
- }
-
- /**
- * Walk the given directory for all HFiles, and return a Queue containing all such files.
- */
- private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
- final boolean validateHFile) throws IOException {
- visitBulkHFiles(hfofDir.getFileSystem(getConf()), hfofDir, new BulkHFileVisitor<byte[]>() {
- @Override
- public byte[] bulkFamily(final byte[] familyName) {
- return familyName;
- }
-
- @Override
- public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
- long length = hfile.getLen();
- if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
- HConstants.DEFAULT_MAX_FILE_SIZE)) {
- LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length +
- " bytes can be problematic as it may lead to oversplitting.");
- }
- ret.add(new LoadQueueItem(family, hfile.getPath()));
- }
- }, validateHFile);
- }
-
- private interface BulkHFileVisitor<TFamily> {
-
- TFamily bulkFamily(byte[] familyName) throws IOException;
-
- void bulkHFile(TFamily family, FileStatus hfileStatus) throws IOException;
- }
-
- /**
- * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_" and
- * non-valid hfiles.
- */
- private static <TFamily> void visitBulkHFiles(final FileSystem fs, final Path bulkDir,
- final BulkHFileVisitor<TFamily> visitor) throws IOException {
- visitBulkHFiles(fs, bulkDir, visitor, true);
- }
-
- /**
- * Iterate over the bulkDir hfiles. Skip reference, HFileLink, files starting with "_". Check and
- * skip non-valid hfiles by default, or skip this validation by setting
- * 'hbase.loadincremental.validate.hfile' to false.
- */
- private static <TFamily> void visitBulkHFiles(FileSystem fs, Path bulkDir,
- BulkHFileVisitor<TFamily> visitor, boolean validateHFile) throws IOException {
- FileStatus[] familyDirStatuses = fs.listStatus(bulkDir);
- for (FileStatus familyStat : familyDirStatuses) {
- if (!familyStat.isDirectory()) {
- LOG.warn("Skipping non-directory " + familyStat.getPath());
- continue;
- }
- Path familyDir = familyStat.getPath();
- byte[] familyName = Bytes.toBytes(familyDir.getName());
- // Skip invalid family
- try {
- ColumnFamilyDescriptorBuilder.isLegalColumnFamilyName(familyName);
- } catch (IllegalArgumentException e) {
- LOG.warn("Skipping invalid " + familyStat.getPath());
- continue;
- }
- TFamily family = visitor.bulkFamily(familyName);
-
- FileStatus[] hfileStatuses = fs.listStatus(familyDir);
- for (FileStatus hfileStatus : hfileStatuses) {
- if (!fs.isFile(hfileStatus.getPath())) {
- LOG.warn("Skipping non-file " + hfileStatus);
- continue;
- }
-
- Path hfile = hfileStatus.getPath();
- // Skip "_", reference, HFileLink
- String fileName = hfile.getName();
- if (fileName.startsWith("_")) {
- continue;
- }
- if (StoreFileInfo.isReference(fileName)) {
- LOG.warn("Skipping reference " + fileName);
- continue;
- }
- if (HFileLink.isHFileLink(fileName)) {
- LOG.warn("Skipping HFileLink " + fileName);
- continue;
- }
-
- // Validate HFile Format if needed
- if (validateHFile) {
- try {
- if (!HFile.isHFileFormat(fs, hfile)) {
- LOG.warn("the file " + hfile + " doesn't seems to be an hfile. skipping");
- continue;
- }
- } catch (FileNotFoundException e) {
- LOG.warn("the file " + hfile + " was removed");
- continue;
- }
- }
-
- visitor.bulkHFile(family, hfileStatus);
- }
- }
- }
-
- // Initialize a thread pool
- private ExecutorService createExecutorService() {
- ThreadPoolExecutor pool = new ThreadPoolExecutor(nrThreads, nrThreads, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new ThreadFactoryBuilder().setNameFormat("LoadIncrementalHFiles-%1$d").build());
- pool.allowCoreThreadTimeOut(true);
- return pool;
- }
-
- private final String toString(List<Pair<byte[], String>> list) {
- StringBuilder sb = new StringBuilder();
- sb.append('[');
- list.forEach(p -> {
- sb.append('{').append(Bytes.toStringBinary(p.getFirst())).append(',').append(p.getSecond())
- .append('}');
- });
- sb.append(']');
- return sb.toString();
- }
-
- /**
- * Split a storefile into a top and bottom half, maintaining the metadata, recreating bloom
- * filters, etc.
- */
- @VisibleForTesting
- static void splitStoreFile(Configuration conf, Path inFile, ColumnFamilyDescriptor familyDesc,
- byte[] splitKey, Path bottomOut, Path topOut) throws IOException {
- // Open reader with no block cache, and not in-memory
- Reference topReference = Reference.createTopReference(splitKey);
- Reference bottomReference = Reference.createBottomReference(splitKey);
-
- copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
- copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
- }
-
- /**
- * Copy half of an HFile into a new HFile.
- */
- private static void copyHFileHalf(Configuration conf, Path inFile, Path outFile,
- Reference reference, ColumnFamilyDescriptor familyDescriptor) throws IOException {
- FileSystem fs = inFile.getFileSystem(conf);
- CacheConfig cacheConf = CacheConfig.DISABLED;
- HalfStoreFileReader halfReader = null;
- StoreFileWriter halfWriter = null;
- try {
- halfReader = new HalfStoreFileReader(fs, inFile, cacheConf, reference, true,
- new AtomicInteger(0), true, conf);
- Map<byte[], byte[]> fileInfo = halfReader.loadFileInfo();
-
- int blocksize = familyDescriptor.getBlocksize();
- Algorithm compression = familyDescriptor.getCompressionType();
- BloomType bloomFilterType = familyDescriptor.getBloomFilterType();
- HFileContext hFileContext = new HFileContextBuilder().withCompression(compression)
- .withChecksumType(HStore.getChecksumType(conf))
- .withBytesPerCheckSum(HStore.getBytesPerChecksum(conf)).withBlockSize(blocksize)
- .withDataBlockEncoding(familyDescriptor.getDataBlockEncoding()).withIncludesTags(true)
- .build();
- halfWriter = new StoreFileWriter.Builder(conf, cacheConf, fs).withFilePath(outFile)
- .withBloomType(bloomFilterType).withFileContext(hFileContext).build();
- HFileScanner scanner = halfReader.getScanner(false, false, false);
- scanner.seekTo();
- do {
- halfWriter.append(scanner.getCell());
- } while (scanner.next());
-
- for (Map.Entry<byte[], byte[]> entry : fileInfo.entrySet()) {
- if (shouldCopyHFileMetaKey(entry.getKey())) {
- halfWriter.appendFileInfo(entry.getKey(), entry.getValue());
- }
- }
- } finally {
- if (halfReader != null) {
- try {
- halfReader.close(cacheConf.shouldEvictOnClose());
- } catch (IOException e) {
- LOG.warn("failed to close hfile reader for " + inFile, e);
- }
- }
- if (halfWriter != null) {
- halfWriter.close();
- }
-
- }
- }
-
- private static boolean shouldCopyHFileMetaKey(byte[] key) {
- // skip encoding to keep hfile meta consistent with data block info, see HBASE-15085
- if (Bytes.equals(key, HFileDataBlockEncoder.DATA_BLOCK_ENCODING)) {
- return false;
- }
-
- return !HFile.isReservedFileInfoKey(key);
- }
-
- private boolean isCreateTable() {
- return "yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"));
- }
-
- private boolean isSilence() {
- return "yes".equalsIgnoreCase(getConf().get(IGNORE_UNMATCHED_CF_CONF_KEY, ""));
- }
-
- private boolean isAlwaysCopyFiles() {
- return getConf().getBoolean(ALWAYS_COPY_FILES, false);
- }
-
- protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName)
- throws IOException {
- try (Connection connection = ConnectionFactory.createConnection(getConf());
- Admin admin = connection.getAdmin()) {
- if (!admin.tableExists(tableName)) {
- if (isCreateTable()) {
- createTable(tableName, hfofDir, admin);
- } else {
- String errorMsg = format("Table '%s' does not exist.", tableName);
- LOG.error(errorMsg);
- throw new TableNotFoundException(errorMsg);
- }
- }
- try (Table table = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName)) {
- return doBulkLoad(hfofDir, admin, table, locator, isSilence(),
- isAlwaysCopyFiles());
- }
- }
- }
- /**
- * Perform bulk load on the given table.
- * @param hfofDir the directory that was provided as the output path of a job using
- * HFileOutputFormat
- * @param tableName the table to load into
- */
- public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
- throws IOException {
- return run(new Path(hfofDir), tableName);
- }
-
- /**
- * Perform bulk load on the given table.
- * @param family2Files map of family to List of hfiles
- * @param tableName the table to load into
- */
- public Map<LoadQueueItem, ByteBuffer> run(Map<byte[], List<Path>> family2Files,
- TableName tableName) throws IOException {
- try (Connection connection = ConnectionFactory.createConnection(getConf());
- Admin admin = connection.getAdmin()) {
- if (!admin.tableExists(tableName)) {
- String errorMsg = format("Table '%s' does not exist.", tableName);
- LOG.error(errorMsg);
- throw new TableNotFoundException(errorMsg);
- }
- try (Table table = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName)) {
- return doBulkLoad(family2Files, admin, table, locator, isSilence(), isAlwaysCopyFiles());
- }
- }
- }
-
- @Override
- public int run(String[] args) throws Exception {
- if (args.length != 2 && args.length != 3) {
- usage();
- return -1;
- }
- String dirPath = args[0];
- TableName tableName = TableName.valueOf(args[1]);
-
-
- if (args.length == 2) {
- return !run(dirPath, tableName).isEmpty() ? 0 : -1;
- } else {
- Map<byte[], List<Path>> family2Files = Maps.newHashMap();
- FileSystem fs = FileSystem.get(getConf());
- for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) {
- FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
- Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
- byte[] familyName = Bytes.toBytes(family);
- if (family2Files.containsKey(familyName)) {
- family2Files.get(familyName).add(path);
- } else {
- family2Files.put(familyName, Lists.newArrayList(path));
- }
- });
- }
- return !run(family2Files, tableName).isEmpty() ? 0 : -1;
- }
-
- }
-
- public static void main(String[] args) throws Exception {
- Configuration conf = HBaseConfiguration.create();
- int ret = ToolRunner.run(conf, new LoadIncrementalHFiles(conf), args);
- System.exit(ret);
- }
-
- /**
- * Called from replication sink, where it manages bulkToken(staging directory) by itself. This is
- * used only when SecureBulkLoadEndpoint is configured in hbase.coprocessor.region.classes
- * property. This directory is used as a temporary directory where all files are initially
- * copied/moved from user given directory, set all the required file permissions and then from
- * their it is finally loaded into a table. This should be set only when, one would like to manage
- * the staging directory by itself. Otherwise this tool will handle this by itself.
- * @param stagingDir staging directory path
- */
- public void setBulkToken(String stagingDir) {
- this.bulkToken = stagingDir;
- }
-
- /**
- * Infers region boundaries for a new table.
- * <p>
- * Parameter: <br>
- * bdryMap is a map between keys to an integer belonging to {+1, -1}
- * <ul>
- * <li>If a key is a start key of a file, then it maps to +1</li>
- * <li>If a key is an end key of a file, then it maps to -1</li>
- * </ul>
- * <p>
- * Algo:<br>
- * <ol>
- * <li>Poll on the keys in order:
- * <ol type="a">
- * <li>Keep adding the mapped values to these keys (runningSum)</li>
- * <li>Each time runningSum reaches 0, add the start Key from when the runningSum had started to a
- * boundary list.</li>
- * </ol>
- * </li>
- * <li>Return the boundary list.</li>
- * </ol>
- */
- public static byte[][] inferBoundaries(SortedMap<byte[], Integer> bdryMap) {
- List<byte[]> keysArray = new ArrayList<>();
- int runningValue = 0;
- byte[] currStartKey = null;
- boolean firstBoundary = true;
-
- for (Map.Entry<byte[], Integer> item : bdryMap.entrySet()) {
- if (runningValue == 0) {
- currStartKey = item.getKey();
- }
- runningValue += item.getValue();
- if (runningValue == 0) {
- if (!firstBoundary) {
- keysArray.add(currStartKey);
- }
- firstBoundary = false;
- }
- }
-
- return keysArray.toArray(new byte[0][]);
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index ac2d8e1..30314b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -126,6 +126,7 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes.ByteArrayComparator;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter.ERROR_CODE;
import org.apache.hadoop.hbase.util.hbck.HFileCorruptionChecker;
@@ -3562,14 +3563,12 @@ public class HBaseFsck extends Configured implements Closeable {
}
public void dumpSidelinedRegions(Map<Path, HbckInfo> regions) {
- for (Map.Entry<Path, HbckInfo> entry: regions.entrySet()) {
+ for (Map.Entry<Path, HbckInfo> entry : regions.entrySet()) {
TableName tableName = entry.getValue().getTableName();
Path path = entry.getKey();
- errors.print("This sidelined region dir should be bulk loaded: "
- + path.toString());
- errors.print("Bulk load command looks like: "
- + "hbase org.apache.hadoop.hbase.tool.LoadIncrementalHFiles "
- + path.toUri().getPath() + " "+ tableName);
+ errors.print("This sidelined region dir should be bulk loaded: " + path.toString());
+ errors.print("Bulk load command looks like: " + BulkLoadHFilesTool.NAME + " " +
+ path.toUri().getPath() + " " + tableName);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
new file mode 100644
index 0000000..d5fc58e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.hadoop.security.token.Token;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+
+/**
+ * Can be overridden in UT if you only want to implement part of the methods in
+ * {@link AsyncClusterConnection}.
+ */
+public class DummyAsyncClusterConnection implements AsyncClusterConnection {
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public AsyncTableRegionLocator getRegionLocator(TableName tableName) {
+ return null;
+ }
+
+ @Override
+ public void clearRegionLocationCache() {
+ }
+
+ @Override
+ public AsyncTableBuilder<AdvancedScanResultConsumer> getTableBuilder(TableName tableName) {
+ return null;
+ }
+
+ @Override
+ public AsyncTableBuilder<ScanResultConsumer> getTableBuilder(TableName tableName,
+ ExecutorService pool) {
+ return null;
+ }
+
+ @Override
+ public AsyncAdminBuilder getAdminBuilder() {
+ return null;
+ }
+
+ @Override
+ public AsyncAdminBuilder getAdminBuilder(ExecutorService pool) {
+ return null;
+ }
+
+ @Override
+ public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName) {
+ return null;
+ }
+
+ @Override
+ public AsyncBufferedMutatorBuilder getBufferedMutatorBuilder(TableName tableName,
+ ExecutorService pool) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Hbck> getHbck() {
+ return null;
+ }
+
+ @Override
+ public Hbck getHbck(ServerName masterServer) throws IOException {
+ return null;
+ }
+
+ @Override
+ public boolean isClosed() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ }
+
+ @Override
+ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+ return null;
+ }
+
+ @Override
+ public NonceGenerator getNonceGenerator() {
+ return null;
+ }
+
+ @Override
+ public RpcClient getRpcClient() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
+ boolean writeFlushWALMarker) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
+ List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+ boolean reload) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<String> prepareBulkLoad(TableName tableName) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Boolean> bulkLoad(TableName tableName,
+ List<Pair<byte[], String>> familyPaths, byte[] row, boolean assignSeqNum, Token<?> userToken,
+ String bulkToken, boolean copyFiles) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> cleanupBulkLoad(TableName tableName, String bulkToken) {
+ return null;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java
new file mode 100644
index 0000000..e9ae25d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncRegistry.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+
+/**
+ * Can be overridden in UT if you only want to implement part of the methods in
+ * {@link AsyncRegistry}.
+ */
+public class DummyAsyncRegistry implements AsyncRegistry {
+
+ public static final String REGISTRY_IMPL_CONF_KEY = AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY;
+
+ @Override
+ public CompletableFuture<RegionLocations> getMetaRegionLocation() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<String> getClusterId() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Integer> getCurrentNrHRS() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<ServerName> getMasterAddress() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Integer> getMasterInfoPort() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
new file mode 100644
index 0000000..2e9bb74
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncTable.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.RpcChannel;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.TableName;
+
+/**
+ * Can be overridden in UT if you only want to implement part of the methods in {@link AsyncTable}.
+ */
+public class DummyAsyncTable<C extends ScanResultConsumerBase> implements AsyncTable<C> {
+
+ @Override
+ public TableName getName() {
+ return null;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<TableDescriptor> getDescriptor() {
+ return null;
+ }
+
+ @Override
+ public AsyncTableRegionLocator getRegionLocator() {
+ return null;
+ }
+
+ @Override
+ public long getRpcTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public long getReadRpcTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public long getWriteRpcTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public long getOperationTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public long getScanTimeout(TimeUnit unit) {
+ return 0;
+ }
+
+ @Override
+ public CompletableFuture<Result> get(Get get) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> put(Put put) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> delete(Delete delete) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Result> append(Append append) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Result> increment(Increment increment) {
+ return null;
+ }
+
+ @Override
+ public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> mutateRow(RowMutations mutation) {
+ return null;
+ }
+
+ @Override
+ public void scan(Scan scan, C consumer) {
+ }
+
+ @Override
+ public ResultScanner getScanner(Scan scan) {
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<List<Result>> scanAll(Scan scan) {
+ return null;
+ }
+
+ @Override
+ public List<CompletableFuture<Result>> get(List<Get> gets) {
+ return null;
+ }
+
+ @Override
+ public List<CompletableFuture<Void>> put(List<Put> puts) {
+ return null;
+ }
+
+ @Override
+ public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
+ return null;
+ }
+
+ @Override
+ public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
+ return null;
+ }
+
+ @Override
+ public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> stubMaker,
+ ServiceCaller<S, R> callable, byte[] row) {
+ return null;
+ }
+
+ @Override
+ public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
+ Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
+ CoprocessorCallback<R> callback) {
+ return null;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
index 987ac7e..d53353e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicaWithCluster.java
@@ -18,10 +18,12 @@
package org.apache.hadoop.hbase.client;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -45,15 +47,14 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -472,40 +473,17 @@ public class TestReplicaWithCluster {
final int numRows = 10;
final byte[] qual = Bytes.toBytes("qual");
final byte[] val = Bytes.toBytes("val");
- final List<Pair<byte[], String>> famPaths = new ArrayList<>();
+ Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (HColumnDescriptor col : hdt.getColumnFamilies()) {
Path hfile = new Path(dir, col.getNameAsString());
- TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(),
- qual, val, numRows);
- famPaths.add(new Pair<>(col.getName(), hfile.toString()));
+ TestHRegionServerBulkLoad.createHFile(HTU.getTestFileSystem(), hfile, col.getName(), qual,
+ val, numRows);
+ family2Files.put(col.getName(), Collections.singletonList(hfile));
}
// bulk load HFiles
LOG.debug("Loading test data");
- final ClusterConnection conn = (ClusterConnection) HTU.getAdmin().getConnection();
- table = conn.getTable(hdt.getTableName());
- final String bulkToken =
- new SecureBulkLoadClient(HTU.getConfiguration(), table).prepareBulkLoad(conn);
- ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
- hdt.getTableName(), TestHRegionServerBulkLoad.rowkey(0),
- new RpcControllerFactory(HTU.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
- @Override
- protected Void rpcCall() throws Exception {
- LOG.debug("Going to connect to server " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()));
- SecureBulkLoadClient secureClient = null;
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(HTU.getConfiguration(), table);
- secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- true, null, bulkToken);
- }
- return null;
- }
- };
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(HTU.getConfiguration());
- RpcRetryingCaller<Void> caller = factory.newCaller();
- caller.callWithRetries(callable, 10000);
+ BulkLoadHFiles.create(HTU.getConfiguration()).bulkLoad(hdt.getTableName(), family2Files);
// verify we can read them from the primary
LOG.debug("Verifying data load");
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
index 6934c98..a90f4e9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java
@@ -72,7 +72,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTrack
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
@@ -567,7 +567,7 @@ public class TestRegionObserverInterface {
createHFile(util.getConfiguration(), fs, new Path(familyDir, Bytes.toString(A)), A, A);
// Bulk load
- new LoadIncrementalHFiles(conf).doBulkLoad(dir, util.getAdmin(), table, locator);
+ BulkLoadHFiles.create(conf).bulkLoad(tableName, dir);
verifyMethodResult(SimpleRegionObserver.class,
new String[] { "hadPreBulkLoadHFile", "hadPostBulkLoadHFile" }, tableName,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
index 40cd540..84463b3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/SpaceQuotaHelperForTests.java
@@ -22,41 +22,38 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.HStoreFile;
import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
@@ -378,39 +375,21 @@ public class SpaceQuotaHelperForTests {
/**
* Bulk-loads a number of files with a number of rows to the given table.
*/
- ClientServiceCallable<Boolean> generateFileToLoad(
- TableName tn, int numFiles, int numRowsPerFile) throws Exception {
- Connection conn = testUtil.getConnection();
+ Map<byte[], List<Path>> generateFileToLoad(TableName tn, int numFiles, int numRowsPerFile)
+ throws Exception {
FileSystem fs = testUtil.getTestFileSystem();
- Configuration conf = testUtil.getConfiguration();
Path baseDir = new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files");
fs.mkdirs(baseDir);
- final List<Pair<byte[], String>> famPaths = new ArrayList<>();
+ List<Path> hfiles = new ArrayList<>();
for (int i = 1; i <= numFiles; i++) {
Path hfile = new Path(baseDir, "file" + i);
- TestHRegionServerBulkLoad.createHFile(
- fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("my"),
- Bytes.toBytes("file"), numRowsPerFile);
- famPaths.add(new Pair<>(Bytes.toBytes(SpaceQuotaHelperForTests.F1), hfile.toString()));
+ TestHRegionServerBulkLoad.createHFile(fs, hfile, Bytes.toBytes(SpaceQuotaHelperForTests.F1),
+ Bytes.toBytes("my"), Bytes.toBytes("file"), numRowsPerFile);
+ hfiles.add(hfile);
}
-
- // bulk load HFiles
- Table table = conn.getTable(tn);
- final String bulkToken = new SecureBulkLoadClient(conf, table).prepareBulkLoad(conn);
- return new ClientServiceCallable<Boolean>(
- conn, tn, Bytes.toBytes("row"), new RpcControllerFactory(conf).newController(),
- HConstants.PRIORITY_UNSET) {
- @Override
- public Boolean rpcCall() throws Exception {
- SecureBulkLoadClient secureClient = null;
- byte[] regionName = getLocation().getRegion().getRegionName();
- try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(conf, table);
- return secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- true, null, bulkToken);
- }
- }
- };
+ Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ family2Files.put(Bytes.toBytes(SpaceQuotaHelperForTests.F1), hfiles);
+ return family2Files;
}
/**
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java
index fdc7ad3..9e3dd58 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestLowLatencySpaceQuotas.java
@@ -17,12 +17,13 @@
package org.apache.hadoop.hbase.quotas;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -31,11 +32,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.quotas.SpaceQuotaHelperForTests.SpaceQuotaSnapshotPredicate;
@@ -43,6 +41,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
@@ -214,7 +213,7 @@ public class TestLowLatencySpaceQuotas {
tn, SpaceQuotaHelperForTests.ONE_GIGABYTE, SpaceViolationPolicy.NO_INSERTS);
admin.setQuota(settings);
- ClientServiceCallable<Boolean> callable = helper.generateFileToLoad(tn, 3, 550);
+ Map<byte[], List<Path>> family2Files = helper.generateFileToLoad(tn, 3, 550);
// Make sure the files are about as long as we expect
FileSystem fs = TEST_UTIL.getTestFileSystem();
FileStatus[] files = fs.listStatus(
@@ -228,13 +227,13 @@ public class TestLowLatencySpaceQuotas {
totalSize += file.getLen();
}
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
- RpcRetryingCaller<Boolean> caller = factory.<Boolean> newCaller();
- assertTrue("The bulk load failed", caller.callWithRetries(callable, Integer.MAX_VALUE));
+ assertFalse("The bulk load failed",
+ BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tn, family2Files).isEmpty());
final long finalTotalSize = totalSize;
TEST_UTIL.waitFor(30 * 1000, 500, new SpaceQuotaSnapshotPredicate(conn, tn) {
- @Override boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
+ @Override
+ boolean evaluate(SpaceQuotaSnapshot snapshot) throws Exception {
return snapshot.getUsage() >= finalTotalSize;
}
});
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
index 05ee68a..fca5453 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestSpaceQuotas.java
@@ -17,13 +17,17 @@
*/
package org.apache.hadoop.hbase.quotas;
+import static org.hamcrest.CoreMatchers.instanceOf;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicLong;
@@ -38,7 +42,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Increment;
@@ -47,8 +50,6 @@ import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.master.HMaster;
@@ -56,6 +57,7 @@ import org.apache.hadoop.hbase.quotas.policies.DefaultViolationPolicyEnforcement
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
import org.junit.AfterClass;
@@ -237,19 +239,18 @@ public class TestSpaceQuotas {
@Test
public void testNoBulkLoadsWithNoWrites() throws Exception {
Put p = new Put(Bytes.toBytes("to_reject"));
- p.addColumn(
- Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"), Bytes.toBytes("reject"));
+ p.addColumn(Bytes.toBytes(SpaceQuotaHelperForTests.F1), Bytes.toBytes("to"),
+ Bytes.toBytes("reject"));
TableName tableName = writeUntilViolationAndVerifyViolation(SpaceViolationPolicy.NO_WRITES, p);
// The table is now in violation. Try to do a bulk load
- ClientServiceCallable<Boolean> callable = helper.generateFileToLoad(tableName, 1, 50);
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
- RpcRetryingCaller<Boolean> caller = factory.newCaller();
+ Map<byte[], List<Path>> family2Files = helper.generateFileToLoad(tableName, 1, 50);
try {
- caller.callWithRetries(callable, Integer.MAX_VALUE);
+ BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
fail("Expected the bulk load call to fail!");
- } catch (SpaceLimitingException e) {
+ } catch (IOException e) {
// Pass
+ assertThat(e.getCause(), instanceOf(SpaceLimitingException.class));
LOG.trace("Caught expected exception", e);
}
}
@@ -293,7 +294,7 @@ public class TestSpaceQuotas {
enforcement instanceof DefaultViolationPolicyEnforcement);
// Should generate two files, each of which is over 25KB each
- ClientServiceCallable<Boolean> callable = helper.generateFileToLoad(tn, 2, 525);
+ Map<byte[], List<Path>> family2Files = helper.generateFileToLoad(tn, 2, 525);
FileSystem fs = TEST_UTIL.getTestFileSystem();
FileStatus[] files = fs.listStatus(
new Path(fs.getHomeDirectory(), testName.getMethodName() + "_files"));
@@ -305,13 +306,12 @@ public class TestSpaceQuotas {
LOG.debug(file.getPath() + " -> " + file.getLen() +"B");
}
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(TEST_UTIL.getConfiguration());
- RpcRetryingCaller<Boolean> caller = factory.newCaller();
try {
- caller.callWithRetries(callable, Integer.MAX_VALUE);
+ BulkLoadHFiles.create(TEST_UTIL.getConfiguration()).bulkLoad(tn, family2Files);
fail("Expected the bulk load call to fail!");
- } catch (SpaceLimitingException e) {
+ } catch (IOException e) {
// Pass
+ assertThat(e.getCause(), instanceOf(SpaceLimitingException.class));
LOG.trace("Caught expected exception", e);
}
// Verify that we have no data in the table because neither file should have been
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
index c86f3e1..fd02cf4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
@@ -25,9 +25,11 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -53,7 +55,6 @@ import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.SecureBulkLoadClient;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
@@ -71,8 +72,8 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.TestWALActionsListener;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
@@ -204,59 +205,37 @@ public class TestHRegionServerBulkLoad {
// create HFiles for different column families
FileSystem fs = UTIL.getTestFileSystem();
byte[] val = Bytes.toBytes(String.format("%010d", iteration));
- final List<Pair<byte[], String>> famPaths = new ArrayList<>(NUM_CFS);
+ Map<byte[], List<Path>> family2Files = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (int i = 0; i < NUM_CFS; i++) {
Path hfile = new Path(dir, family(i));
byte[] fam = Bytes.toBytes(family(i));
createHFile(fs, hfile, fam, QUAL, val, 1000);
- famPaths.add(new Pair<>(fam, hfile.toString()));
+ family2Files.put(fam, Collections.singletonList(hfile));
}
-
// bulk load HFiles
- final ClusterConnection conn = (ClusterConnection)UTIL.getConnection();
- Table table = conn.getTable(tableName);
- final String bulkToken = new SecureBulkLoadClient(UTIL.getConfiguration(), table).
- prepareBulkLoad(conn);
- ClientServiceCallable<Void> callable = new ClientServiceCallable<Void>(conn,
- tableName, Bytes.toBytes("aaa"),
- new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
- @Override
- public Void rpcCall() throws Exception {
- LOG.debug("Going to connect to server " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()));
- SecureBulkLoadClient secureClient = null;
- byte[] regionName = getLocation().getRegionInfo().getRegionName();
- try (Table table = conn.getTable(getTableName())) {
- secureClient = new SecureBulkLoadClient(UTIL.getConfiguration(), table);
- secureClient.secureBulkLoadHFiles(getStub(), famPaths, regionName,
- true, null, bulkToken);
- }
- return null;
- }
- };
- RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
- RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
- caller.callWithRetries(callable, Integer.MAX_VALUE);
-
+ BulkLoadHFiles.create(UTIL.getConfiguration()).bulkLoad(tableName, family2Files);
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 5 == 0) {
+ RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+ RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
// 5 * 50 = 250 open file handles!
- callable = new ClientServiceCallable<Void>(conn,
- tableName, Bytes.toBytes("aaa"),
- new RpcControllerFactory(UTIL.getConfiguration()).newController(), HConstants.PRIORITY_UNSET) {
- @Override
- protected Void rpcCall() throws Exception {
- LOG.debug("compacting " + getLocation() + " for row "
- + Bytes.toStringBinary(getRow()));
- AdminProtos.AdminService.BlockingInterface server =
- conn.getAdmin(getLocation().getServerName());
- CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(
+ ClientServiceCallable<Void> callable =
+ new ClientServiceCallable<Void>(UTIL.getConnection(), tableName, Bytes.toBytes("aaa"),
+ new RpcControllerFactory(UTIL.getConfiguration()).newController(),
+ HConstants.PRIORITY_UNSET) {
+ @Override
+ protected Void rpcCall() throws Exception {
+ LOG.debug(
+ "compacting " + getLocation() + " for row " + Bytes.toStringBinary(getRow()));
+ AdminProtos.AdminService.BlockingInterface server =
+ ((ClusterConnection) UTIL.getConnection()).getAdmin(getLocation().getServerName());
+ CompactRegionRequest request = RequestConverter.buildCompactRegionRequest(
getLocation().getRegionInfo().getRegionName(), true, null);
- server.compactRegion(null, request);
- numCompactions.incrementAndGet();
- return null;
- }
- };
+ server.compactRegion(null, request);
+ numCompactions.incrementAndGet();
+ return null;
+ }
+ };
caller.callWithRetries(callable, Integer.MAX_VALUE);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
index f30f084..1fe1f3f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
@@ -45,7 +44,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -94,10 +93,7 @@ public class TestScannerWithBulkload {
false);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
- final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
- try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
- bulkload.doBulkLoad(hfilePath, admin, table, locator);
- }
+ BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
result = scanAfterBulkLoad(scanner, result, "version2");
@@ -233,7 +229,7 @@ public class TestScannerWithBulkload {
"/temp/testBulkLoadWithParallelScan/col/file", false);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
- final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+ final BulkLoadHFiles bulkload = BulkLoadHFiles.create(conf);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
// Create a scanner and then do bulk load
@@ -246,9 +242,7 @@ public class TestScannerWithBulkload {
put1.add(new KeyValue(Bytes.toBytes("row5"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
Bytes.toBytes("version0")));
table.put(put1);
- try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
- bulkload.doBulkLoad(hfilePath, admin, table, locator);
- }
+ bulkload.bulkLoad(tableName, hfilePath);
latch.countDown();
} catch (TableNotFoundException e) {
} catch (IOException e) {
@@ -276,10 +270,7 @@ public class TestScannerWithBulkload {
"/temp/testBulkLoadNativeHFile/col/file", true);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
- final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
- try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(tableName)) {
- bulkload.doBulkLoad(hfilePath, admin, table, locator);
- }
+ BulkLoadHFiles.create(conf).bulkLoad(tableName, hfilePath);
ResultScanner scanner = table.getScanner(scan);
Result result = scanner.next();
// We had 'version0', 'version1' for 'row1,col:q' in the table.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
index eb25806..5c73d07 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSecureBulkLoadManager.java
@@ -21,10 +21,8 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Deque;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@@ -32,8 +30,8 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
@@ -47,7 +45,7 @@ import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads;
@@ -59,6 +57,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
@@ -178,7 +177,7 @@ public class TestSecureBulkLoadManager {
/**
* A trick is used to make sure server-side failures( if any ) not being covered up by a client
- * retry. Since LoadIncrementalHFiles.doBulkLoad keeps performing bulkload calls as long as the
+ * retry. Since BulkLoadHFilesTool.bulkLoad keeps performing bulkload calls as long as the
* HFile queue is not empty, while server-side exceptions in the doAs block do not lead
* to a client exception, a bulkload will always succeed in this case by default, thus client
* will never be aware that failures have ever happened . To avoid this kind of retry ,
@@ -187,23 +186,23 @@ public class TestSecureBulkLoadManager {
* once, and server-side failures, if any ,can be checked via data.
*/
class MyExceptionToAvoidRetry extends DoNotRetryIOException {
+
+ private static final long serialVersionUID = -6802760664998771151L;
}
private void doBulkloadWithoutRetry(Path dir) throws Exception {
- Connection connection = testUtil.getConnection();
- LoadIncrementalHFiles h = new LoadIncrementalHFiles(conf) {
+ BulkLoadHFilesTool h = new BulkLoadHFilesTool(conf) {
+
@Override
- protected void bulkLoadPhase(final Table htable, final Connection conn,
- ExecutorService pool, Deque<LoadQueueItem> queue,
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
- Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
- super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
+ protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
+ Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+ super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
throw new MyExceptionToAvoidRetry(); // throw exception to avoid retry
}
};
try {
- h.doBulkLoad(dir, testUtil.getAdmin(), connection.getTable(TABLE),
- connection.getRegionLocator(TABLE));
+ h.bulkLoad(TABLE, dir);
Assert.fail("MyExceptionToAvoidRetry is expected");
} catch (MyExceptionToAvoidRetry e) { //expected
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 37ca7dc..f546058 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -67,7 +67,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -622,9 +622,7 @@ public class TestMasterReplication {
Table source = tables[masterNumber];
final TableName tableName = source.getName();
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
- String[] args = { dir.toString(), tableName.toString() };
- loader.run(args);
+ BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir);
if (toValidate) {
for (int slaveClusterNumber : slaveNumbers) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index 2d6c28f..eb3a7a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -275,7 +275,7 @@ public class TestReplicationSink {
}
@Test
- public void testRethrowRetriesExhaustedWithDetailsException() throws Exception {
+ public void testRethrowRetriesExhaustedException() throws Exception {
TableName notExistTable = TableName.valueOf("notExistTable");
List<WALEntry> entries = new ArrayList<>();
List<Cell> cells = new ArrayList<>();
@@ -300,7 +300,7 @@ public class TestReplicationSink {
SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
replicationClusterId, baseNamespaceDir, hfileArchiveDir);
Assert.fail("Should re-throw RetriesExhaustedWithDetailsException.");
- } catch (RetriesExhaustedWithDetailsException e) {
+ } catch (RetriesExhaustedException e) {
} finally {
admin.enableTable(TABLE_NAME1);
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
index ff46a98..41d4f46 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java
@@ -21,47 +21,31 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilder;
import org.apache.hadoop.hbase.CellBuilderFactory;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.BufferedMutatorParams;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
+import org.apache.hadoop.hbase.client.DummyAsyncClusterConnection;
+import org.apache.hadoop.hbase.client.DummyAsyncRegistry;
+import org.apache.hadoop.hbase.client.DummyAsyncTable;
import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.client.RowMutations;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableBuilder;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.coprocessor.Batch;
-import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@@ -81,15 +65,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
/**
* Simple test of sink-side wal entry filter facility.
*/
-@Category({ReplicationTests.class, SmallTests.class})
+@Category({ ReplicationTests.class, SmallTests.class })
public class TestWALEntrySinkFilter {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class);
+ HBaseClassTestRule.forClass(TestWALEntrySinkFilter.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationSink.class);
- @Rule public TestName name = new TestName();
+ @Rule
+ public TestName name = new TestName();
static final int BOUNDARY = 5;
static final AtomicInteger UNFILTERED = new AtomicInteger();
static final AtomicInteger FILTERED = new AtomicInteger();
@@ -113,55 +98,48 @@ public class TestWALEntrySinkFilter {
};
/**
- * Test filter.
- * Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many items we
- * filter out and we count how many cells make it through for distribution way down below in the
- * Table#batch implementation. Puts in place a custom DevNullConnection so we can insert our
- * counting Table.
+ * Test filter. Filter will filter out any write time that is <= 5 (BOUNDARY). We count how many
+ * items we filter out and we count how many cells make it through for distribution way down below
+ * in the Table#batch implementation. Puts in place a custom DevNullConnection so we can insert
+ * our counting Table.
* @throws IOException
*/
@Test
public void testWALEntryFilter() throws IOException {
Configuration conf = HBaseConfiguration.create();
// Make it so our filter is instantiated on construction of ReplicationSink.
+ conf.setClass(DummyAsyncRegistry.REGISTRY_IMPL_CONF_KEY, DevNullAsyncRegistry.class,
+ DummyAsyncRegistry.class);
conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY,
- IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
- conf.setClass("hbase.client.connection.impl", DevNullConnection.class,
- Connection.class);
+ IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class);
+ conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL,
+ DevNullAsyncClusterConnection.class, AsyncClusterConnection.class);
ReplicationSink sink = new ReplicationSink(conf, STOPPABLE);
// Create some dumb walentries.
- List< org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry > entries =
- new ArrayList<>();
+ List<AdminProtos.WALEntry> entries = new ArrayList<>();
AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
// Need a tablename.
ByteString tableName =
- ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
+ ByteString.copyFromUtf8(TableName.valueOf(this.name.getMethodName()).toString());
// Add WALEdit Cells to Cells List. The way edits arrive at the sink is with protos
// describing the edit with all Cells from all edits aggregated in a single CellScanner.
final List<Cell> cells = new ArrayList<>();
int count = BOUNDARY * 2;
- for(int i = 0; i < count; i++) {
- byte [] bytes = Bytes.toBytes(i);
+ for (int i = 0; i < count; i++) {
+ byte[] bytes = Bytes.toBytes(i);
// Create a wal entry. Everything is set to the current index as bytes or int/long.
entryBuilder.clear();
- entryBuilder.setKey(entryBuilder.getKeyBuilder().
- setLogSequenceNumber(i).
- setEncodedRegionName(ByteString.copyFrom(bytes)).
- setWriteTime(i).
- setTableName(tableName).build());
+ entryBuilder.setKey(entryBuilder.getKeyBuilder().setLogSequenceNumber(i)
+ .setEncodedRegionName(ByteString.copyFrom(bytes)).setWriteTime(i).setTableName(tableName)
+ .build());
// Lets have one Cell associated with each WALEdit.
entryBuilder.setAssociatedCellCount(1);
entries.add(entryBuilder.build());
// We need to add a Cell per WALEdit to the cells array.
CellBuilder cellBuilder = CellBuilderFactory.create(CellBuilderType.DEEP_COPY);
// Make cells whose row, family, cell, value, and ts are == 'i'.
- Cell cell = cellBuilder.
- setRow(bytes).
- setFamily(bytes).
- setQualifier(bytes).
- setType(Cell.Type.Put).
- setTimestamp(i).
- setValue(bytes).build();
+ Cell cell = cellBuilder.setRow(bytes).setFamily(bytes).setQualifier(bytes)
+ .setType(Cell.Type.Put).setTimestamp(i).setValue(bytes).build();
cells.add(cell);
}
// Now wrap our cells array in a CellScanner that we can pass in to replicateEntries. It has
@@ -192,11 +170,13 @@ public class TestWALEntrySinkFilter {
/**
* Simple filter that will filter out any entry wholse writeTime is <= 5.
*/
- public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl implements WALEntrySinkFilter {
- public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {}
+ public static class IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl
+ implements WALEntrySinkFilter {
+ public IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl() {
+ }
@Override
- public void init(Connection connection) {
+ public void init(AsyncConnection conn) {
// Do nothing.
}
@@ -210,335 +190,48 @@ public class TestWALEntrySinkFilter {
}
}
- /**
- * A DevNull Connection whose only purpose is checking what edits made it through. See down in
- * {@link Table#batch(List, Object[])}.
- */
- public static class DevNullConnection implements Connection {
- private final Configuration configuration;
+ public static class DevNullAsyncRegistry extends DummyAsyncRegistry {
- DevNullConnection(Configuration configuration, ExecutorService es, User user) {
- this.configuration = configuration;
+ public DevNullAsyncRegistry(Configuration conf) {
}
@Override
- public void abort(String why, Throwable e) {
-
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
-
- @Override
- public Configuration getConfiguration() {
- return this.configuration;
- }
-
- @Override
- public BufferedMutator getBufferedMutator(TableName tableName) throws IOException {
- return null;
- }
-
- @Override
- public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws IOException {
- return null;
- }
-
- @Override
- public RegionLocator getRegionLocator(TableName tableName) throws IOException {
- return null;
- }
-
- @Override
- public Admin getAdmin() throws IOException {
- return null;
+ public CompletableFuture<String> getClusterId() {
+ return CompletableFuture.completedFuture("test");
}
+ }
- @Override
- public void close() throws IOException {
+ public static class DevNullAsyncClusterConnection extends DummyAsyncClusterConnection {
- }
+ private final Configuration conf;
- @Override
- public boolean isClosed() {
- return false;
+ public DevNullAsyncClusterConnection(Configuration conf, Object registry, String clusterId,
+ SocketAddress localAddress, User user) {
+ this.conf = conf;
}
@Override
- public TableBuilder getTableBuilder(final TableName tableName, ExecutorService pool) {
- return new TableBuilder() {
- @Override
- public TableBuilder setOperationTimeout(int timeout) {
- return this;
- }
+ public AsyncTable<AdvancedScanResultConsumer> getTable(TableName tableName) {
+ return new DummyAsyncTable<AdvancedScanResultConsumer>() {
@Override
- public TableBuilder setRpcTimeout(int timeout) {
- return this;
- }
-
- @Override
- public TableBuilder setReadRpcTimeout(int timeout) {
- return this;
- }
-
- @Override
- public TableBuilder setWriteRpcTimeout(int timeout) {
- return this;
- }
-
- @Override
- public Table build() {
- return new Table() {
- @Override
- public TableName getName() {
- return tableName;
- }
-
- @Override
- public Configuration getConfiguration() {
- return configuration;
- }
-
- @Override
- public HTableDescriptor getTableDescriptor() throws IOException {
- return null;
- }
-
- @Override
- public TableDescriptor getDescriptor() throws IOException {
- return null;
- }
-
- @Override
- public boolean exists(Get get) throws IOException {
- return false;
- }
-
- @Override
- public boolean[] exists(List<Get> gets) throws IOException {
- return new boolean[0];
- }
-
- @Override
- public void batch(List<? extends Row> actions, Object[] results) throws IOException, InterruptedException {
- for (Row action: actions) {
- // Row is the index of the loop above where we make WALEntry and Cells.
- int row = Bytes.toInt(action.getRow());
- assertTrue("" + row, row> BOUNDARY);
- UNFILTERED.incrementAndGet();
- }
- }
-
- @Override
- public <R> void batchCallback(List<? extends Row> actions, Object[] results, Batch.Callback<R> callback) throws IOException, InterruptedException {
-
- }
-
- @Override
- public Result get(Get get) throws IOException {
- return null;
- }
-
- @Override
- public Result[] get(List<Get> gets) throws IOException {
- return new Result[0];
- }
-
- @Override
- public ResultScanner getScanner(Scan scan) throws IOException {
- return null;
- }
-
- @Override
- public ResultScanner getScanner(byte[] family) throws IOException {
- return null;
- }
-
- @Override
- public ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException {
- return null;
- }
-
- @Override
- public void put(Put put) throws IOException {
-
- }
-
- @Override
- public void put(List<Put> puts) throws IOException {
-
- }
-
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException {
- return false;
- }
-
- @Override
- public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Put put) throws IOException {
- return false;
- }
-
- @Override
- public void delete(Delete delete) throws IOException {
-
- }
-
- @Override
- public void delete(List<Delete> deletes) throws IOException {
-
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, byte[] value, Delete delete) throws IOException {
- return false;
- }
-
- @Override
- public boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, Delete delete) throws IOException {
- return false;
- }
-
- @Override
- public CheckAndMutateBuilder checkAndMutate(byte[] row, byte[] family) {
- return null;
- }
-
- @Override
- public void mutateRow(RowMutations rm) throws IOException {
-
- }
-
- @Override
- public Result append(Append append) throws IOException {
- return null;
- }
-
- @Override
- public Result increment(Increment increment) throws IOException {
- return null;
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException {
- return 0;
- }
-
- @Override
- public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability) throws IOException {
- return 0;
- }
-
- @Override
- public void close() throws IOException {
-
- }
-
- @Override
- public CoprocessorRpcChannel coprocessorService(byte[] row) {
- return null;
- }
-
- @Override
- public <T extends com.google.protobuf.Service, R> Map<byte[], R> coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable) throws com.google.protobuf.ServiceException, Throwable {
- return null;
- }
-
- @Override
- public <T extends com.google.protobuf.Service, R> void coprocessorService(Class<T> service, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
-
- }
-
- @Override
- public <R extends com.google.protobuf.Message> Map<byte[], R> batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype) throws com.google.protobuf.ServiceException, Throwable {
- return null;
- }
-
- @Override
- public <R extends com.google.protobuf.Message> void batchCoprocessorService(com.google.protobuf.Descriptors.MethodDescriptor methodDescriptor, com.google.protobuf.Message request, byte[] startKey, byte[] endKey, R responsePrototype, Batch.Callback<R> callback) throws com.google.protobuf.ServiceException, Throwable {
-
- }
-
- @Override
- public boolean checkAndMutate(byte[] row, byte[] family, byte[] qualifier, CompareOperator op, byte[] value, RowMutations mutation) throws IOException {
- return false;
- }
-
- @Override
- public long getRpcTimeout(TimeUnit unit) {
- return 0;
- }
-
- @Override
- public int getRpcTimeout() {
- return 0;
- }
-
- @Override
- public void setRpcTimeout(int rpcTimeout) {
-
- }
-
- @Override
- public long getReadRpcTimeout(TimeUnit unit) {
- return 0;
- }
-
- @Override
- public int getReadRpcTimeout() {
- return 0;
- }
-
- @Override
- public void setReadRpcTimeout(int readRpcTimeout) {
-
- }
-
- @Override
- public long getWriteRpcTimeout(TimeUnit unit) {
- return 0;
- }
-
- @Override
- public int getWriteRpcTimeout() {
- return 0;
- }
-
- @Override
- public void setWriteRpcTimeout(int writeRpcTimeout) {
-
- }
-
- @Override
- public long getOperationTimeout(TimeUnit unit) {
- return 0;
- }
-
- @Override
- public int getOperationTimeout() {
- return 0;
- }
-
- @Override
- public void setOperationTimeout(int operationTimeout) {
- }
-
- @Override
- public RegionLocator getRegionLocator() throws IOException {
- return null;
- }
- };
+ public <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
+ List<T> list = new ArrayList<>(actions.size());
+ for (Row action : actions) {
+ // Row is the index of the loop above where we make WALEntry and Cells.
+ int row = Bytes.toInt(action.getRow());
+ assertTrue("" + row, row > BOUNDARY);
+ UNFILTERED.incrementAndGet();
+ list.add(null);
+ }
+ return CompletableFuture.completedFuture(list);
}
};
}
@Override
- public void clearRegionLocationCache() {
+ public Configuration getConfiguration() {
+ return conf;
}
}
}
-
-
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
index 523b82f..b9de705 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java
@@ -124,7 +124,7 @@ import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.Permission.Action;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.SecurityTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.Threads;
@@ -1111,14 +1111,8 @@ public class TestAccessController extends SecureTestUtil {
}
private void bulkLoadHFile(TableName tableName) throws Exception {
- try (Connection conn = ConnectionFactory.createConnection(conf);
- Admin admin = conn.getAdmin();
- RegionLocator locator = conn.getRegionLocator(tableName);
- Table table = conn.getTable(tableName)) {
- TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- loader.doBulkLoad(loadPath, admin, table, locator);
- }
+ TEST_UTIL.waitUntilAllRegionsAssigned(tableName);
+ BulkLoadHFiles.create(conf).bulkLoad(tableName, loadPath);
}
private static void setPermission(FileSystem fs, Path dir, FsPermission perm)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
similarity index 83%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
index 7c04edc..e85fc1a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.tool;
+import static org.apache.hadoop.hbase.HBaseTestingUtility.countRows;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -72,11 +73,11 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
* faster than the full MR cluster tests in TestHFileOutputFormat
*/
@Category({ MiscTests.class, LargeTests.class })
-public class TestLoadIncrementalHFiles {
+public class TestBulkLoadHFiles {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestLoadIncrementalHFiles.class);
+ HBaseClassTestRule.forClass(TestBulkLoadHFiles.class);
@Rule
public TestName tn = new TestName();
@@ -89,14 +90,14 @@ public class TestLoadIncrementalHFiles {
static final int MAX_FILES_PER_REGION_PER_FAMILY = 4;
private static final byte[][] SPLIT_KEYS =
- new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ppp") };
static HBaseTestingUtility util = new HBaseTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
- util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+ util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY);
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
@@ -119,7 +120,7 @@ public class TestLoadIncrementalHFiles {
public void testSimpleLoadWithMap() throws Exception {
runTest("testSimpleLoadWithMap", BloomType.NONE,
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
- new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
true);
}
@@ -130,16 +131,16 @@ public class TestLoadIncrementalHFiles {
public void testSimpleLoad() throws Exception {
runTest("testSimpleLoad", BloomType.NONE,
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
- new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, });
}
@Test
public void testSimpleLoadWithFileCopy() throws Exception {
String testName = tn.getMethodName();
final byte[] TABLE_NAME = Bytes.toBytes("mytable_" + testName);
- runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE),
- false, null, new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
- new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
+ runTest(testName, buildHTD(TableName.valueOf(TABLE_NAME), BloomType.NONE), false, null,
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, },
false, true, 2);
}
@@ -150,7 +151,7 @@ public class TestLoadIncrementalHFiles {
public void testRegionCrossingLoad() throws Exception {
runTest("testRegionCrossingLoad", BloomType.NONE,
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
- new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+ new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
}
/**
@@ -160,7 +161,7 @@ public class TestLoadIncrementalHFiles {
public void testRegionCrossingRowBloom() throws Exception {
runTest("testRegionCrossingLoadRowBloom", BloomType.ROW,
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
- new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+ new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
}
/**
@@ -170,7 +171,7 @@ public class TestLoadIncrementalHFiles {
public void testRegionCrossingRowColBloom() throws Exception {
runTest("testRegionCrossingLoadRowColBloom", BloomType.ROWCOL,
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
- new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+ new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
}
/**
@@ -181,9 +182,9 @@ public class TestLoadIncrementalHFiles {
public void testSimpleHFileSplit() throws Exception {
runTest("testHFileSplit", BloomType.NONE,
new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
- Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
+ Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
- new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
+ new byte[][] { Bytes.toBytes("mmm"), Bytes.toBytes("zzz") }, });
}
/**
@@ -217,27 +218,27 @@ public class TestLoadIncrementalHFiles {
public void testSplitALot() throws Exception {
runTest("testSplitALot", BloomType.NONE,
new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("bbb"), Bytes.toBytes("ccc"),
- Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
- Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
- Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
- Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
- Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
+ Bytes.toBytes("ddd"), Bytes.toBytes("eee"), Bytes.toBytes("fff"), Bytes.toBytes("ggg"),
+ Bytes.toBytes("hhh"), Bytes.toBytes("iii"), Bytes.toBytes("lll"), Bytes.toBytes("mmm"),
+ Bytes.toBytes("nnn"), Bytes.toBytes("ooo"), Bytes.toBytes("ppp"), Bytes.toBytes("qqq"),
+ Bytes.toBytes("rrr"), Bytes.toBytes("sss"), Bytes.toBytes("ttt"), Bytes.toBytes("uuu"),
+ Bytes.toBytes("vvv"), Bytes.toBytes("zzz"), },
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") }, });
}
private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
runTest("testHFileSplit" + bloomType + "Bloom", bloomType,
new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"), Bytes.toBytes("jjj"),
- Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
+ Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), },
new byte[][][] { new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
- new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
+ new byte[][] { Bytes.toBytes("fff"), Bytes.toBytes("zzz") }, });
}
private TableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
return TableDescriptorBuilder.newBuilder(tableName)
- .setColumnFamily(
- ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
- .build();
+ .setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBloomFilterType(bloomType).build())
+ .build();
}
private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges)
@@ -265,28 +266,24 @@ public class TestLoadIncrementalHFiles {
runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
useMap, 2);
-
- /* Run the test bulkloading the table from a depth of 3
- directory structure is now
- baseDirectory
- -- regionDir
- -- familyDir
- -- storeFileDir
- */
+ /*
+ * Run the test bulkloading the table from a depth of 3 directory structure is now baseDirectory
+ * -- regionDir -- familyDir -- storeFileDir
+ */
if (preCreateTable) {
- runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges,
- false, 3);
+ runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, false,
+ 3);
}
// Run the test bulkloading the table to the specified namespace
final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
- runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges,
- useMap, 2);
+ runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, useMap,
+ 2);
}
private void runTest(String testName, TableName tableName, BloomType bloomType,
- boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges,
- boolean useMap, int depth) throws Exception {
+ boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
+ int depth) throws Exception {
TableDescriptor htd = buildHTD(tableName, bloomType);
runTest(testName, htd, preCreateTable, tableSplitKeys, hfileRanges, useMap, false, depth);
}
@@ -296,7 +293,7 @@ public class TestLoadIncrementalHFiles {
byte[][][] hfileRanges, boolean useMap, boolean deleteFile, boolean copyFiles,
int initRowCount, int factor) throws Exception {
return loadHFiles(testName, htd, util, fam, qual, preCreateTable, tableSplitKeys, hfileRanges,
- useMap, deleteFile, copyFiles, initRowCount, factor, 2);
+ useMap, deleteFile, copyFiles, initRowCount, factor, 2);
}
public static int loadHFiles(String testName, TableDescriptor htd, HBaseTestingUtility util,
@@ -343,7 +340,7 @@ public class TestLoadIncrementalHFiles {
Configuration conf = util.getConfiguration();
if (copyFiles) {
- conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
+ conf.setBoolean(BulkLoadHFiles.ALWAYS_COPY_FILES, true);
}
BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
@@ -374,26 +371,23 @@ public class TestLoadIncrementalHFiles {
}
}
- Table table = util.getConnection().getTable(tableName);
- try {
- assertEquals(initRowCount + expectedRows, util.countRows(table));
- } finally {
- table.close();
+ try (Table table = util.getConnection().getTable(tableName)) {
+ assertEquals(initRowCount + expectedRows, countRows(table));
}
return expectedRows;
}
- private void runTest(String testName, TableDescriptor htd,
- boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap,
- boolean copyFiles, int depth) throws Exception {
+ private void runTest(String testName, TableDescriptor htd, boolean preCreateTable,
+ byte[][] tableSplitKeys, byte[][][] hfileRanges, boolean useMap, boolean copyFiles, int depth)
+ throws Exception {
loadHFiles(testName, htd, util, FAMILY, QUALIFIER, preCreateTable, tableSplitKeys, hfileRanges,
useMap, true, copyFiles, 0, 1000, depth);
final TableName tableName = htd.getTableName();
// verify staging folder has been cleaned up
Path stagingBasePath =
- new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
+ new Path(FSUtils.getRootDir(util.getConfiguration()), HConstants.BULKLOAD_STAGING_DIR_NAME);
FileSystem fs = util.getTestFileSystem();
if (fs.exists(stagingBasePath)) {
FileStatus[] files = fs.listStatus(stagingBasePath);
@@ -419,7 +413,7 @@ public class TestLoadIncrementalHFiles {
Path familyDir = new Path(dir, Bytes.toString(FAMILY));
// table has these split points
byte[][] tableSplitKeys = new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("fff"),
- Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
+ Bytes.toBytes("jjj"), Bytes.toBytes("ppp"), Bytes.toBytes("uuu"), Bytes.toBytes("zzz"), };
// creating an hfile that has values that span the split points.
byte[] from = Bytes.toBytes("ddd");
@@ -432,13 +426,11 @@ public class TestLoadIncrementalHFiles {
TableDescriptor htd = buildHTD(tableName, BloomType.NONE);
util.getAdmin().createTable(htd, tableSplitKeys);
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
- String[] args = { dir.toString(), tableName.toString() };
- loader.run(args);
+ BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(tableName, dir);
Table table = util.getConnection().getTable(tableName);
try {
- assertEquals(expectedRows, util.countRows(table));
+ assertEquals(expectedRows, countRows(table));
HFileTestUtil.verifyTags(table);
} finally {
table.close();
@@ -454,16 +446,16 @@ public class TestLoadIncrementalHFiles {
public void testNonexistentColumnFamilyLoad() throws Exception {
String testName = tn.getMethodName();
byte[][][] hFileRanges =
- new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
- new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
+ new byte[][][] { new byte[][] { Bytes.toBytes("aaa"), Bytes.toBytes("ccc") },
+ new byte[][] { Bytes.toBytes("ddd"), Bytes.toBytes("ooo") }, };
byte[] TABLE = Bytes.toBytes("mytable_" + testName);
// set real family name to upper case in purpose to simulate the case that
// family name in HFiles is invalid
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf(TABLE))
- .setColumnFamily(ColumnFamilyDescriptorBuilder
- .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
- .build();
+ .setColumnFamily(ColumnFamilyDescriptorBuilder
+ .of(Bytes.toBytes(new String(FAMILY).toUpperCase(Locale.ROOT))))
+ .build();
try {
runTest(testName, htd, true, SPLIT_KEYS, hFileRanges, false, false, 2);
@@ -474,7 +466,7 @@ public class TestLoadIncrementalHFiles {
String errMsg = e.getMessage();
assertTrue(
"Incorrect exception message, expected message: [" + EXPECTED_MSG_FOR_NON_EXISTING_FAMILY +
- "], current message: [" + errMsg + "]",
+ "], current message: [" + errMsg + "]",
errMsg.contains(EXPECTED_MSG_FOR_NON_EXISTING_FAMILY));
}
}
@@ -517,10 +509,8 @@ public class TestLoadIncrementalHFiles {
} else {
table = util.getConnection().getTable(TableName.valueOf(tableName));
}
-
- final String[] args = { dir.toString(), tableName };
- new LoadIncrementalHFiles(util.getConfiguration()).run(args);
- assertEquals(500, util.countRows(table));
+ BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(TableName.valueOf(tableName), dir);
+ assertEquals(500, countRows(table));
} finally {
if (table != null) {
table.close();
@@ -560,7 +550,7 @@ public class TestLoadIncrementalHFiles {
Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");
- LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
+ BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
Bytes.toBytes("ggg"), bottomOut, topOut);
int rowCount = verifyHFile(bottomOut);
@@ -594,14 +584,14 @@ public class TestLoadIncrementalHFiles {
FileSystem fs = util.getTestFileSystem();
Path testIn = new Path(dir, "testhfile");
ColumnFamilyDescriptor familyDesc =
- ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
+ ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setDataBlockEncoding(cfEncoding).build();
HFileTestUtil.createHFileWithDataBlockEncoding(util.getConfiguration(), fs, testIn,
bulkloadEncoding, FAMILY, QUALIFIER, Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
Path bottomOut = new Path(dir, "bottom.out");
Path topOut = new Path(dir, "top.out");
- LoadIncrementalHFiles.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
+ BulkLoadHFilesTool.splitStoreFile(util.getConfiguration(), testIn, familyDesc,
Bytes.toBytes("ggg"), bottomOut, topOut);
int rowCount = verifyHFile(bottomOut);
@@ -612,7 +602,7 @@ public class TestLoadIncrementalHFiles {
private int verifyHFile(Path p) throws IOException {
Configuration conf = util.getConfiguration();
HFile.Reader reader =
- HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
+ HFile.createReader(p.getFileSystem(conf), p, new CacheConfig(conf), true, conf);
reader.loadFileInfo();
HFileScanner scanner = reader.getScanner(false, false);
scanner.seekTo();
@@ -682,7 +672,7 @@ public class TestLoadIncrementalHFiles {
last = "w";
addStartEndKeysForTest(map, Bytes.toBytes(first), Bytes.toBytes(last));
- byte[][] keysArray = LoadIncrementalHFiles.inferBoundaries(map);
+ byte[][] keysArray = BulkLoadHFilesTool.inferBoundaries(map);
byte[][] compare = new byte[3][];
compare[0] = Bytes.toBytes("m");
compare[1] = Bytes.toBytes("r");
@@ -709,22 +699,21 @@ public class TestLoadIncrementalHFiles {
FAMILY, QUALIFIER, from, to, 1000);
}
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
- String[] args = { dir.toString(), "mytable_testLoadTooMayHFiles" };
try {
- loader.run(args);
+ BulkLoadHFiles.create(util.getConfiguration())
+ .bulkLoad(TableName.valueOf("mytable_testLoadTooMayHFiles"), dir);
fail("Bulk loading too many files should fail");
} catch (IOException ie) {
assertTrue(ie.getMessage()
- .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
+ .contains("Trying to load more than " + MAX_FILES_PER_REGION_PER_FAMILY + " hfiles"));
}
}
@Test(expected = TableNotFoundException.class)
public void testWithoutAnExistingTableAndCreateTableSetToNo() throws Exception {
Configuration conf = util.getConfiguration();
- conf.set(LoadIncrementalHFiles.CREATE_TABLE_CONF_KEY, "no");
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ conf.set(BulkLoadHFiles.CREATE_TABLE_CONF_KEY, "no");
+ BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
String[] args = { "directory", "nonExistingTable" };
loader.run(args);
}
@@ -741,19 +730,11 @@ public class TestLoadIncrementalHFiles {
byte[] to = Bytes.toBytes("end");
Configuration conf = util.getConfiguration();
String tableName = tn.getMethodName();
- Table table = util.createTable(TableName.valueOf(tableName), family);
- HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
- QUALIFIER, from, to, 1000);
-
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
- String[] args = { dir.toString(), tableName };
- try {
- loader.run(args);
- assertEquals(1000, util.countRows(table));
- } finally {
- if (null != table) {
- table.close();
- }
+ try (Table table = util.createTable(TableName.valueOf(tableName), family)) {
+ HFileTestUtil.createHFile(conf, fs, new Path(familyDir, "hfile"), Bytes.toBytes(family),
+ QUALIFIER, from, to, 1000);
+ BulkLoadHFiles.create(conf).bulkLoad(table.getName(), dir);
+ assertEquals(1000, countRows(table));
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
new file mode 100644
index 0000000..2aef16e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesSplitRecovery.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyBoolean;
+import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Deque;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableExistsException;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+
+/**
+ * Test cases for the atomic load error handling of the bulk load functionality.
+ */
+@Category({ MiscTests.class, LargeTests.class })
+public class TestBulkLoadHFilesSplitRecovery {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBulkLoadHFilesSplitRecovery.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
+
+ static HBaseTestingUtility util;
+ // used by secure subclass
+ static boolean useSecure = false;
+
+ final static int NUM_CFS = 10;
+ final static byte[] QUAL = Bytes.toBytes("qual");
+ final static int ROWCOUNT = 100;
+
+ private final static byte[][] families = new byte[NUM_CFS][];
+
+ @Rule
+ public TestName name = new TestName();
+
+ static {
+ for (int i = 0; i < NUM_CFS; i++) {
+ families[i] = Bytes.toBytes(family(i));
+ }
+ }
+
+ static byte[] rowkey(int i) {
+ return Bytes.toBytes(String.format("row_%08d", i));
+ }
+
+ static String family(int i) {
+ return String.format("family_%04d", i);
+ }
+
+ static byte[] value(int i) {
+ return Bytes.toBytes(String.format("%010d", i));
+ }
+
+ public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
+ byte[] val = value(value);
+ for (int i = 0; i < NUM_CFS; i++) {
+ Path testIn = new Path(dir, family(i));
+
+ TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
+ Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
+ }
+ }
+
+ private TableDescriptor createTableDesc(TableName name, int cfs) {
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
+ IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
+ .forEachOrdered(builder::setColumnFamily);
+ return builder.build();
+ }
+
+ /**
+ * Creates a table with given table name and specified number of column families if the table does
+ * not already exist.
+ */
+ private void setupTable(final Connection connection, TableName table, int cfs)
+ throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ try (Admin admin = connection.getAdmin()) {
+ admin.createTable(createTableDesc(table, cfs));
+ }
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ /**
+ * Creates a table with given table name,specified number of column families<br>
+ * and splitkeys if the table does not already exist.
+ * @param table
+ * @param cfs
+ * @param SPLIT_KEYS
+ */
+ private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
+ throws IOException {
+ try {
+ LOG.info("Creating table " + table);
+ util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
+ } catch (TableExistsException tee) {
+ LOG.info("Table " + table + " already exists");
+ }
+ }
+
+ private Path buildBulkFiles(TableName table, int value) throws Exception {
+ Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
+ Path bulk1 = new Path(dir, table.getNameAsString() + value);
+ FileSystem fs = util.getTestFileSystem();
+ buildHFiles(fs, bulk1, value);
+ return bulk1;
+ }
+
+ /**
+ * Populate table with known values.
+ */
+ private void populateTable(final Connection connection, TableName table, int value)
+ throws Exception {
+ // create HFiles for different column families
+ Path dir = buildBulkFiles(table, value);
+ BulkLoadHFiles.create(util.getConfiguration()).bulkLoad(table, dir);
+ }
+
+ /**
+ * Split the known table in half. (this is hard coded for this test suite)
+ */
+ private void forceSplit(TableName table) {
+ try {
+ // need to call regions server to by synchronous but isn't visible.
+ HRegionServer hrs = util.getRSForFirstRegionInTable(table);
+
+ for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+ if (hri.getTable().equals(table)) {
+ util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
+ // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
+ }
+ }
+
+ // verify that split completed.
+ int regions;
+ do {
+ regions = 0;
+ for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
+ if (hri.getTable().equals(table)) {
+ regions++;
+ }
+ }
+ if (regions != 2) {
+ LOG.info("Taking some time to complete split...");
+ Thread.sleep(250);
+ }
+ } while (regions != 2);
+ } catch (IOException e) {
+ e.printStackTrace();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @BeforeClass
+ public static void setupCluster() throws Exception {
+ util = new HBaseTestingUtility();
+ util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ util.startMiniCluster(1);
+ }
+
+ @AfterClass
+ public static void teardownCluster() throws Exception {
+ util.shutdownMiniCluster();
+ }
+
+ /**
+ * Checks that all columns have the expected value and that there is the expected number of rows.
+ * @throws IOException
+ */
+ void assertExpectedTable(TableName table, int count, int value) throws IOException {
+ TableDescriptor htd = util.getAdmin().getDescriptor(table);
+ assertNotNull(htd);
+ try (Table t = util.getConnection().getTable(table);
+ ResultScanner sr = t.getScanner(new Scan())) {
+ int i = 0;
+ for (Result r; (r = sr.next()) != null;) {
+ r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
+ .forEach(v -> assertArrayEquals(value(value), v));
+ i++;
+ }
+ assertEquals(count, i);
+ } catch (IOException e) {
+ fail("Failed due to exception");
+ }
+ }
+
+ private static <T> CompletableFuture<T> failedFuture(Throwable error) {
+ CompletableFuture<T> future = new CompletableFuture<>();
+ future.completeExceptionally(error);
+ return future;
+ }
+
+ private static AsyncClusterConnection mockAndInjectError(AsyncClusterConnection conn) {
+ AsyncClusterConnection errConn = spy(conn);
+ doReturn(failedFuture(new IOException("injecting bulk load error"))).when(errConn)
+ .bulkLoad(any(), anyList(), any(), anyBoolean(), any(), any(), anyBoolean());
+ return errConn;
+ }
+
+ /**
+ * Test that shows that exception thrown from the RS side will result in an exception on the
+ * LIHFile client.
+ */
+ @Test(expected = IOException.class)
+ public void testBulkLoadPhaseFailure() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ final AtomicInteger attemptedCalls = new AtomicInteger();
+ Configuration conf = new Configuration(util.getConfiguration());
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) {
+
+ @Override
+ protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
+ Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+ AsyncClusterConnection c =
+ attemptedCalls.incrementAndGet() == 1 ? mockAndInjectError(conn) : conn;
+ super.bulkLoadPhase(c, tableName, queue, regionGroups, copyFiles, item2RegionMap);
+ }
+ };
+ Path dir = buildBulkFiles(table, 1);
+ loader.bulkLoad(table, dir);
+ }
+
+ /**
+ * Test that shows that exception thrown from the RS side will result in the expected number of
+ * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
+ * ${@link BulkLoadHFiles#RETRY_ON_IO_EXCEPTION} is set
+ */
+ @Test
+ public void testRetryOnIOException() throws Exception {
+ TableName table = TableName.valueOf(name.getMethodName());
+ AtomicInteger calls = new AtomicInteger(0);
+ setupTable(util.getConnection(), table, 10);
+ Configuration conf = new Configuration(util.getConfiguration());
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
+ conf.setBoolean(BulkLoadHFiles.RETRY_ON_IO_EXCEPTION, true);
+ BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf) {
+
+ @Override
+ protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
+ Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+ if (calls.get() < conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
+ calls.incrementAndGet();
+ super.bulkLoadPhase(mockAndInjectError(conn), tableName, queue, regionGroups, copyFiles,
+ item2RegionMap);
+ } else {
+ super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
+ }
+ }
+ };
+ Path dir = buildBulkFiles(table, 1);
+ loader.bulkLoad(table, dir);
+ assertEquals(calls.get(), 2);
+ }
+
+ /**
+ * This test exercises the path where there is a split after initial validation but before the
+ * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
+ * split just before the atomic region load.
+ */
+ @Test
+ public void testSplitWhileBulkLoadPhase() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ setupTable(util.getConnection(), table, 10);
+ populateTable(util.getConnection(), table, 1);
+ assertExpectedTable(table, ROWCOUNT, 1);
+
+ // Now let's cause trouble. This will occur after checks and cause bulk
+ // files to fail when attempt to atomically import. This is recoverable.
+ final AtomicInteger attemptedCalls = new AtomicInteger();
+ BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
+
+ @Override
+ protected void bulkLoadPhase(AsyncClusterConnection conn, TableName tableName,
+ Deque<LoadQueueItem> queue, Multimap<ByteBuffer, LoadQueueItem> regionGroups,
+ boolean copyFiles, Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
+ int i = attemptedCalls.incrementAndGet();
+ if (i == 1) {
+ // On first attempt force a split.
+ forceSplit(table);
+ }
+ super.bulkLoadPhase(conn, tableName, queue, regionGroups, copyFiles, item2RegionMap);
+ }
+ };
+
+ // create HFiles for different column families
+ Path dir = buildBulkFiles(table, 2);
+ loader.bulkLoad(table, dir);
+
+ // check that data was loaded
+ // The three expected attempts are 1) failure because need to split, 2)
+ // load of split top 3) load of split bottom
+ assertEquals(3, attemptedCalls.get());
+ assertExpectedTable(table, ROWCOUNT, 2);
+ }
+
+ /**
+ * This test splits a table and attempts to bulk load. The bulk import files should be split
+ * before atomically importing.
+ */
+ @Test
+ public void testGroupOrSplitPresplit() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ setupTable(util.getConnection(), table, 10);
+ populateTable(util.getConnection(), table, 1);
+ assertExpectedTable(util.getConnection(), table, ROWCOUNT, 1);
+ forceSplit(table);
+
+ final AtomicInteger countedLqis = new AtomicInteger();
+ BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
+
+ @Override
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
+ TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
+ List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
+ Pair<List<LoadQueueItem>, String> lqis =
+ super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
+ if (lqis != null && lqis.getFirst() != null) {
+ countedLqis.addAndGet(lqis.getFirst().size());
+ }
+ return lqis;
+ }
+ };
+
+ // create HFiles for different column families
+ Path dir = buildBulkFiles(table, 2);
+ loader.bulkLoad(table, dir);
+ assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2);
+ assertEquals(20, countedLqis.get());
+ }
+
+ /**
+ * This test creates a table with many small regions. The bulk load files would be splitted
+ * multiple times before all of them can be loaded successfully.
+ */
+ @Test
+ public void testSplitTmpFileCleanUp() throws Exception {
+ final TableName table = TableName.valueOf(name.getMethodName());
+ byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
+ Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
+ Bytes.toBytes("row_00000050") };
+ setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
+
+ BulkLoadHFiles loader = BulkLoadHFiles.create(util.getConfiguration());
+
+ // create HFiles
+ Path dir = buildBulkFiles(table, 2);
+ loader.bulkLoad(table, dir);
+ // family path
+ Path tmpPath = new Path(dir, family(0));
+ // TMP_DIR under family path
+ tmpPath = new Path(tmpPath, BulkLoadHFilesTool.TMP_DIR);
+ FileSystem fs = dir.getFileSystem(util.getConfiguration());
+ // HFiles have been splitted, there is TMP_DIR
+ assertTrue(fs.exists(tmpPath));
+ // TMP_DIR should have been cleaned-up
+ assertNull(BulkLoadHFilesTool.TMP_DIR + " should be empty.", FSUtils.listStatus(fs, tmpPath));
+ assertExpectedTable(util.getConnection(), table, ROWCOUNT, 2);
+ }
+
+ /**
+ * This simulates an remote exception which should cause LIHF to exit with an exception.
+ */
+ @Test(expected = IOException.class)
+ public void testGroupOrSplitFailure() throws Exception {
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ setupTable(util.getConnection(), tableName, 10);
+ BulkLoadHFilesTool loader = new BulkLoadHFilesTool(util.getConfiguration()) {
+
+ private int i = 0;
+
+ @Override
+ protected Pair<List<LoadQueueItem>, String> groupOrSplit(AsyncClusterConnection conn,
+ TableName tableName, Multimap<ByteBuffer, LoadQueueItem> regionGroups, LoadQueueItem item,
+ List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
+ i++;
+
+ if (i == 5) {
+ throw new IOException("failure");
+ }
+ return super.groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
+ }
+ };
+
+ // create HFiles for different column families
+ Path dir = buildBulkFiles(tableName, 1);
+ loader.bulkLoad(tableName, dir);
+ }
+
+ /**
+ * Checks that all columns have the expected value and that there is the expected number of rows.
+ */
+ void assertExpectedTable(final Connection connection, TableName table, int count, int value)
+ throws IOException {
+ TableDescriptor htd = util.getAdmin().getDescriptor(table);
+ assertNotNull(htd);
+ try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
+ int i = 0;
+ for (Result r; (r = sr.next()) != null;) {
+ r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
+ .forEach(v -> assertArrayEquals(value(value), v));
+ i++;
+ }
+ assertEquals(count, i);
+ } catch (IOException e) {
+ fail("Failed due to exception");
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
deleted file mode 100644
index fcc1bb8..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFilesSplitRecovery.java
+++ /dev/null
@@ -1,630 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.tool;
-
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.Deque;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.stream.IntStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableExistsException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ClientServiceCallable;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionInfoBuilder;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.TestHRegionServerBulkLoad;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
-import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
-
-/**
- * Test cases for the atomic load error handling of the bulk load functionality.
- */
-@Category({ MiscTests.class, LargeTests.class })
-public class TestLoadIncrementalHFilesSplitRecovery {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestLoadIncrementalHFilesSplitRecovery.class);
-
- private static final Logger LOG = LoggerFactory.getLogger(TestHRegionServerBulkLoad.class);
-
- static HBaseTestingUtility util;
- // used by secure subclass
- static boolean useSecure = false;
-
- final static int NUM_CFS = 10;
- final static byte[] QUAL = Bytes.toBytes("qual");
- final static int ROWCOUNT = 100;
-
- private final static byte[][] families = new byte[NUM_CFS][];
-
- @Rule
- public TestName name = new TestName();
-
- static {
- for (int i = 0; i < NUM_CFS; i++) {
- families[i] = Bytes.toBytes(family(i));
- }
- }
-
- static byte[] rowkey(int i) {
- return Bytes.toBytes(String.format("row_%08d", i));
- }
-
- static String family(int i) {
- return String.format("family_%04d", i);
- }
-
- static byte[] value(int i) {
- return Bytes.toBytes(String.format("%010d", i));
- }
-
- public static void buildHFiles(FileSystem fs, Path dir, int value) throws IOException {
- byte[] val = value(value);
- for (int i = 0; i < NUM_CFS; i++) {
- Path testIn = new Path(dir, family(i));
-
- TestHRegionServerBulkLoad.createHFile(fs, new Path(testIn, "hfile_" + i),
- Bytes.toBytes(family(i)), QUAL, val, ROWCOUNT);
- }
- }
-
- private TableDescriptor createTableDesc(TableName name, int cfs) {
- TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(name);
- IntStream.range(0, cfs).mapToObj(i -> ColumnFamilyDescriptorBuilder.of(family(i)))
- .forEachOrdered(builder::setColumnFamily);
- return builder.build();
- }
-
- /**
- * Creates a table with given table name and specified number of column families if the table does
- * not already exist.
- */
- private void setupTable(final Connection connection, TableName table, int cfs)
- throws IOException {
- try {
- LOG.info("Creating table " + table);
- try (Admin admin = connection.getAdmin()) {
- admin.createTable(createTableDesc(table, cfs));
- }
- } catch (TableExistsException tee) {
- LOG.info("Table " + table + " already exists");
- }
- }
-
- /**
- * Creates a table with given table name,specified number of column families<br>
- * and splitkeys if the table does not already exist.
- * @param table
- * @param cfs
- * @param SPLIT_KEYS
- */
- private void setupTableWithSplitkeys(TableName table, int cfs, byte[][] SPLIT_KEYS)
- throws IOException {
- try {
- LOG.info("Creating table " + table);
- util.createTable(createTableDesc(table, cfs), SPLIT_KEYS);
- } catch (TableExistsException tee) {
- LOG.info("Table " + table + " already exists");
- }
- }
-
- private Path buildBulkFiles(TableName table, int value) throws Exception {
- Path dir = util.getDataTestDirOnTestFS(table.getNameAsString());
- Path bulk1 = new Path(dir, table.getNameAsString() + value);
- FileSystem fs = util.getTestFileSystem();
- buildHFiles(fs, bulk1, value);
- return bulk1;
- }
-
- /**
- * Populate table with known values.
- */
- private void populateTable(final Connection connection, TableName table, int value)
- throws Exception {
- // create HFiles for different column families
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
- Path bulk1 = buildBulkFiles(table, value);
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(bulk1, admin, t, locator);
- }
- }
-
- /**
- * Split the known table in half. (this is hard coded for this test suite)
- */
- private void forceSplit(TableName table) {
- try {
- // need to call regions server to by synchronous but isn't visible.
- HRegionServer hrs = util.getRSForFirstRegionInTable(table);
-
- for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
- if (hri.getTable().equals(table)) {
- util.getAdmin().splitRegionAsync(hri.getRegionName(), rowkey(ROWCOUNT / 2));
- // ProtobufUtil.split(null, hrs.getRSRpcServices(), hri, rowkey(ROWCOUNT / 2));
- }
- }
-
- // verify that split completed.
- int regions;
- do {
- regions = 0;
- for (RegionInfo hri : ProtobufUtil.getOnlineRegions(hrs.getRSRpcServices())) {
- if (hri.getTable().equals(table)) {
- regions++;
- }
- }
- if (regions != 2) {
- LOG.info("Taking some time to complete split...");
- Thread.sleep(250);
- }
- } while (regions != 2);
- } catch (IOException e) {
- e.printStackTrace();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
-
- @BeforeClass
- public static void setupCluster() throws Exception {
- util = new HBaseTestingUtility();
- util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
- util.startMiniCluster(1);
- }
-
- @AfterClass
- public static void teardownCluster() throws Exception {
- util.shutdownMiniCluster();
- }
-
- /**
- * Checks that all columns have the expected value and that there is the expected number of rows.
- * @throws IOException
- */
- void assertExpectedTable(TableName table, int count, int value) throws IOException {
- TableDescriptor htd = util.getAdmin().getDescriptor(table);
- assertNotNull(htd);
- try (Table t = util.getConnection().getTable(table);
- ResultScanner sr = t.getScanner(new Scan())) {
- int i = 0;
- for (Result r; (r = sr.next()) != null;) {
- r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
- .forEach(v -> assertArrayEquals(value(value), v));
- i++;
- }
- assertEquals(count, i);
- } catch (IOException e) {
- fail("Failed due to exception");
- }
- }
-
- /**
- * Test that shows that exception thrown from the RS side will result in an exception on the
- * LIHFile client.
- */
- @Test(expected = IOException.class)
- public void testBulkLoadPhaseFailure() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- final AtomicInteger attmptedCalls = new AtomicInteger();
- final AtomicInteger failedCalls = new AtomicInteger();
- util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTable(connection, table, 10);
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
- @Override
- protected List<LoadQueueItem> tryAtomicRegionLoad(
- ClientServiceCallable<byte[]> serviceCallable, TableName tableName, final byte[] first,
- Collection<LoadQueueItem> lqis) throws IOException {
- int i = attmptedCalls.incrementAndGet();
- if (i == 1) {
- Connection errConn;
- try {
- errConn = getMockedConnection(util.getConfiguration());
- serviceCallable = this.buildClientServiceCallable(errConn, table, first, lqis, true);
- } catch (Exception e) {
- LOG.error(HBaseMarkers.FATAL, "mocking cruft, should never happen", e);
- throw new RuntimeException("mocking cruft, should never happen");
- }
- failedCalls.incrementAndGet();
- return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
- }
-
- return super.tryAtomicRegionLoad(serviceCallable, tableName, first, lqis);
- }
- };
- try {
- // create HFiles for different column families
- Path dir = buildBulkFiles(table, 1);
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(dir, admin, t, locator);
- }
- } finally {
- util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- }
- fail("doBulkLoad should have thrown an exception");
- }
- }
-
- /**
- * Test that shows that exception thrown from the RS side will result in the expected number of
- * retries set by ${@link HConstants#HBASE_CLIENT_RETRIES_NUMBER} when
- * ${@link LoadIncrementalHFiles#RETRY_ON_IO_EXCEPTION} is set
- */
- @Test
- public void testRetryOnIOException() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- final AtomicInteger calls = new AtomicInteger(0);
- final Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
- util.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
- util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, true);
- final LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
- @Override
- protected List<LoadQueueItem> tryAtomicRegionLoad(
- ClientServiceCallable<byte[]> serverCallable, TableName tableName, final byte[] first,
- Collection<LoadQueueItem> lqis) throws IOException {
- if (calls.get() < util.getConfiguration().getInt(
- HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
- ClientServiceCallable<byte[]> newServerCallable = new ClientServiceCallable<byte[]>(conn,
- tableName, first, new RpcControllerFactory(util.getConfiguration()).newController(),
- HConstants.PRIORITY_UNSET) {
- @Override
- public byte[] rpcCall() throws Exception {
- throw new IOException("Error calling something on RegionServer");
- }
- };
- calls.getAndIncrement();
- return super.tryAtomicRegionLoad(newServerCallable, tableName, first, lqis);
- } else {
- return super.tryAtomicRegionLoad(serverCallable, tableName, first, lqis);
- }
- }
- };
- setupTable(conn, table, 10);
- Path dir = buildBulkFiles(table, 1);
- lih.doBulkLoad(dir, conn.getAdmin(), conn.getTable(table), conn.getRegionLocator(table));
- assertEquals(calls.get(), 2);
- util.getConfiguration().setBoolean(LoadIncrementalHFiles.RETRY_ON_IO_EXCEPTION, false);
- }
-
- private ClusterConnection getMockedConnection(final Configuration conf)
- throws IOException, org.apache.hbase.thirdparty.com.google.protobuf.ServiceException {
- ServerName sn = ServerName.valueOf("example.org", 1234, 0);
- RegionInfo hri = RegionInfoBuilder.FIRST_META_REGIONINFO;
- ClientProtos.ClientService.BlockingInterface client =
- Mockito.mock(ClientProtos.ClientService.BlockingInterface.class);
- Mockito
- .when(
- client.bulkLoadHFile((RpcController) Mockito.any(), (BulkLoadHFileRequest) Mockito.any()))
- .thenThrow(new ServiceException(new IOException("injecting bulk load error")));
- return HConnectionTestingUtility.getMockedConnectionAndDecorate(conf, null, client, sn, hri);
- }
-
- /**
- * This test exercises the path where there is a split after initial validation but before the
- * atomic bulk load call. We cannot use presplitting to test this path, so we actually inject a
- * split just before the atomic region load.
- */
- @Test
- public void testSplitWhileBulkLoadPhase() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTable(connection, table, 10);
- populateTable(connection, table, 1);
- assertExpectedTable(table, ROWCOUNT, 1);
-
- // Now let's cause trouble. This will occur after checks and cause bulk
- // files to fail when attempt to atomically import. This is recoverable.
- final AtomicInteger attemptedCalls = new AtomicInteger();
- LoadIncrementalHFiles lih2 = new LoadIncrementalHFiles(util.getConfiguration()) {
- @Override
- protected void bulkLoadPhase(final Table htable, final Connection conn,
- ExecutorService pool, Deque<LoadQueueItem> queue,
- final Multimap<ByteBuffer, LoadQueueItem> regionGroups, boolean copyFile,
- Map<LoadQueueItem, ByteBuffer> item2RegionMap) throws IOException {
- int i = attemptedCalls.incrementAndGet();
- if (i == 1) {
- // On first attempt force a split.
- forceSplit(table);
- }
- super.bulkLoadPhase(htable, conn, pool, queue, regionGroups, copyFile, item2RegionMap);
- }
- };
-
- // create HFiles for different column families
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- Path bulk = buildBulkFiles(table, 2);
- lih2.doBulkLoad(bulk, admin, t, locator);
- }
-
- // check that data was loaded
- // The three expected attempts are 1) failure because need to split, 2)
- // load of split top 3) load of split bottom
- assertEquals(3, attemptedCalls.get());
- assertExpectedTable(table, ROWCOUNT, 2);
- }
- }
-
- /**
- * This test splits a table and attempts to bulk load. The bulk import files should be split
- * before atomically importing.
- */
- @Test
- public void testGroupOrSplitPresplit() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTable(connection, table, 10);
- populateTable(connection, table, 1);
- assertExpectedTable(connection, table, ROWCOUNT, 1);
- forceSplit(table);
-
- final AtomicInteger countedLqis = new AtomicInteger();
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
- @Override
- protected Pair<List<LoadQueueItem>, String> groupOrSplit(
- Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
- final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- Pair<List<LoadQueueItem>, String> lqis =
- super.groupOrSplit(regionGroups, item, htable, startEndKeys);
- if (lqis != null && lqis.getFirst() != null) {
- countedLqis.addAndGet(lqis.getFirst().size());
- }
- return lqis;
- }
- };
-
- // create HFiles for different column families
- Path bulk = buildBulkFiles(table, 2);
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(bulk, admin, t, locator);
- }
- assertExpectedTable(connection, table, ROWCOUNT, 2);
- assertEquals(20, countedLqis.get());
- }
- }
-
- /**
- * This test creates a table with many small regions. The bulk load files would be splitted
- * multiple times before all of them can be loaded successfully.
- */
- @Test
- public void testSplitTmpFileCleanUp() throws Exception {
- final TableName table = TableName.valueOf(name.getMethodName());
- byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000010"),
- Bytes.toBytes("row_00000020"), Bytes.toBytes("row_00000030"), Bytes.toBytes("row_00000040"),
- Bytes.toBytes("row_00000050") };
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTableWithSplitkeys(table, 10, SPLIT_KEYS);
-
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration());
-
- // create HFiles
- Path bulk = buildBulkFiles(table, 2);
- try (Table t = connection.getTable(table);
- RegionLocator locator = connection.getRegionLocator(table);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(bulk, admin, t, locator);
- }
- // family path
- Path tmpPath = new Path(bulk, family(0));
- // TMP_DIR under family path
- tmpPath = new Path(tmpPath, LoadIncrementalHFiles.TMP_DIR);
- FileSystem fs = bulk.getFileSystem(util.getConfiguration());
- // HFiles have been splitted, there is TMP_DIR
- assertTrue(fs.exists(tmpPath));
- // TMP_DIR should have been cleaned-up
- assertNull(LoadIncrementalHFiles.TMP_DIR + " should be empty.",
- FSUtils.listStatus(fs, tmpPath));
- assertExpectedTable(connection, table, ROWCOUNT, 2);
- }
- }
-
- /**
- * This simulates an remote exception which should cause LIHF to exit with an exception.
- */
- @Test(expected = IOException.class)
- public void testGroupOrSplitFailure() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
- try (Connection connection = ConnectionFactory.createConnection(util.getConfiguration())) {
- setupTable(connection, tableName, 10);
-
- LoadIncrementalHFiles lih = new LoadIncrementalHFiles(util.getConfiguration()) {
- int i = 0;
-
- @Override
- protected Pair<List<LoadQueueItem>, String> groupOrSplit(
- Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
- final Table table, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- i++;
-
- if (i == 5) {
- throw new IOException("failure");
- }
- return super.groupOrSplit(regionGroups, item, table, startEndKeys);
- }
- };
-
- // create HFiles for different column families
- Path dir = buildBulkFiles(tableName, 1);
- try (Table t = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName);
- Admin admin = connection.getAdmin()) {
- lih.doBulkLoad(dir, admin, t, locator);
- }
- }
-
- fail("doBulkLoad should have thrown an exception");
- }
-
- @Test
- public void testGroupOrSplitWhenRegionHoleExistsInMeta() throws Exception {
- final TableName tableName = TableName.valueOf(name.getMethodName());
- byte[][] SPLIT_KEYS = new byte[][] { Bytes.toBytes("row_00000100") };
- // Share connection. We were failing to find the table with our new reverse scan because it
- // looks for first region, not any region -- that is how it works now. The below removes first
- // region in test. Was reliant on the Connection caching having first region.
- Connection connection = ConnectionFactory.createConnection(util.getConfiguration());
- Table table = connection.getTable(tableName);
-
- setupTableWithSplitkeys(tableName, 10, SPLIT_KEYS);
- Path dir = buildBulkFiles(tableName, 2);
-
- final AtomicInteger countedLqis = new AtomicInteger();
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration()) {
-
- @Override
- protected Pair<List<LoadQueueItem>, String> groupOrSplit(
- Multimap<ByteBuffer, LoadQueueItem> regionGroups, final LoadQueueItem item,
- final Table htable, final Pair<byte[][], byte[][]> startEndKeys) throws IOException {
- Pair<List<LoadQueueItem>, String> lqis =
- super.groupOrSplit(regionGroups, item, htable, startEndKeys);
- if (lqis != null && lqis.getFirst() != null) {
- countedLqis.addAndGet(lqis.getFirst().size());
- }
- return lqis;
- }
- };
-
- // do bulkload when there is no region hole in hbase:meta.
- try (Table t = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName);
- Admin admin = connection.getAdmin()) {
- loader.doBulkLoad(dir, admin, t, locator);
- } catch (Exception e) {
- LOG.error("exeception=", e);
- }
- // check if all the data are loaded into the table.
- this.assertExpectedTable(tableName, ROWCOUNT, 2);
-
- dir = buildBulkFiles(tableName, 3);
-
- // Mess it up by leaving a hole in the hbase:meta
- List<RegionInfo> regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
- for (RegionInfo regionInfo : regionInfos) {
- if (Bytes.equals(regionInfo.getStartKey(), HConstants.EMPTY_BYTE_ARRAY)) {
- MetaTableAccessor.deleteRegion(connection, regionInfo);
- break;
- }
- }
-
- try (Table t = connection.getTable(tableName);
- RegionLocator locator = connection.getRegionLocator(tableName);
- Admin admin = connection.getAdmin()) {
- loader.doBulkLoad(dir, admin, t, locator);
- } catch (Exception e) {
- LOG.error("exception=", e);
- assertTrue("IOException expected", e instanceof IOException);
- }
-
- table.close();
-
- // Make sure at least the one region that still exists can be found.
- regionInfos = MetaTableAccessor.getTableRegions(connection, tableName);
- assertTrue(regionInfos.size() >= 1);
-
- this.assertExpectedTable(connection, tableName, ROWCOUNT, 2);
- connection.close();
- }
-
- /**
- * Checks that all columns have the expected value and that there is the expected number of rows.
- * @throws IOException
- */
- void assertExpectedTable(final Connection connection, TableName table, int count, int value)
- throws IOException {
- TableDescriptor htd = util.getAdmin().getDescriptor(table);
- assertNotNull(htd);
- try (Table t = connection.getTable(table); ResultScanner sr = t.getScanner(new Scan())) {
- int i = 0;
- for (Result r; (r = sr.next()) != null;) {
- r.getNoVersionMap().values().stream().flatMap(m -> m.values().stream())
- .forEach(v -> assertArrayEquals(value(value), v));
- i++;
- }
- assertEquals(count, i);
- } catch (IOException e) {
- fail("Failed due to exception");
- }
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureBulkLoadHFiles.java
similarity index 88%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureBulkLoadHFiles.java
index e09b9ac..05785b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureBulkLoadHFiles.java
@@ -31,20 +31,20 @@ import org.junit.ClassRule;
import org.junit.experimental.categories.Category;
/**
- * Reruns TestLoadIncrementalHFiles using LoadIncrementalHFiles in secure mode. This suite is unable
+ * Reruns TestBulkLoadHFiles using BulkLoadHFiles in secure mode. This suite is unable
* to verify the security handoff/turnover as miniCluster is running as system user thus has root
* privileges and delegation tokens don't seem to work on miniDFS.
- * <p>
+ * <p/>
* Thus SecureBulkload can only be completely verified by running integration tests against a secure
* cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
* supported as part of a LoadIncrementalFiles call.
*/
@Category({ MiscTests.class, LargeTests.class })
-public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles {
+public class TestSecureBulkLoadHFiles extends TestBulkLoadHFiles {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestSecureLoadIncrementalHFiles.class);
+ HBaseClassTestRule.forClass(TestSecureBulkLoadHFiles.class);
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -53,7 +53,7 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles {
HadoopSecurityEnabledUserProviderForTesting.class);
// setup configuration
SecureTestUtil.enableSecurity(util.getConfiguration());
- util.getConfiguration().setInt(LoadIncrementalHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+ util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
MAX_FILES_PER_REGION_PER_FAMILY);
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
@@ -66,5 +66,4 @@ public class TestSecureLoadIncrementalHFiles extends TestLoadIncrementalHFiles {
setupNamespace();
}
-
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureBulkLoadHFilesSplitRecovery.java
similarity index 90%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureBulkLoadHFilesSplitRecovery.java
index 03b9380..5943b0d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureLoadIncrementalHFilesSplitRecovery.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestSecureBulkLoadHFilesSplitRecovery.java
@@ -31,21 +31,20 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
- * Reruns TestSecureLoadIncrementalHFilesSplitRecovery using LoadIncrementalHFiles in secure mode.
+ * Reruns TestBulkLoadHFilesSplitRecovery using BulkLoadHFiles in secure mode.
* This suite is unable to verify the security handoff/turnove as miniCluster is running as system
* user thus has root privileges and delegation tokens don't seem to work on miniDFS.
- * <p>
+ * <p/>
* Thus SecureBulkload can only be completely verified by running integration tests against a secure
* cluster. This suite is still invaluable as it verifies the other mechanisms that need to be
* supported as part of a LoadIncrementalFiles call.
*/
@Category({ MiscTests.class, LargeTests.class })
-public class TestSecureLoadIncrementalHFilesSplitRecovery
- extends TestLoadIncrementalHFilesSplitRecovery {
+public class TestSecureBulkLoadHFilesSplitRecovery extends TestBulkLoadHFilesSplitRecovery {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestSecureLoadIncrementalHFilesSplitRecovery.class);
+ HBaseClassTestRule.forClass(TestSecureBulkLoadHFilesSplitRecovery.class);
// This "overrides" the parent static method
// make sure they are in sync