You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/04/30 10:33:50 UTC
[hbase] branch master updated: HBASE-24221 Support bulkLoadHFile by
family (#1569)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 512d00e HBASE-24221 Support bulkLoadHFile by family (#1569)
512d00e is described below
commit 512d00e75db2bc0af52b164c142e9cf992b8e899
Author: niuyulin <ny...@163.com>
AuthorDate: Thu Apr 30 05:33:37 2020 -0500
HBASE-24221 Support bulkLoadHFile by family (#1569)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../hadoop/hbase/tool/BulkLoadHFilesTool.java | 109 ++++++++++++++-------
.../hadoop/hbase/tool/TestBulkLoadHFiles.java | 1 +
.../hbase/tool/TestBulkLoadHFilesByFamily.java | 45 +++++++++
3 files changed, 118 insertions(+), 37 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 2b67489..b42fbbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -119,6 +119,11 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
* Whether to run validation on hfiles before loading.
*/
private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile";
+ /**
+ * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because
+ * of compacting at server side
+ */
+ public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
// We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name.
@@ -126,6 +131,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
private final int maxFilesPerRegionPerFamily;
private final boolean assignSeqIds;
+ private boolean bulkLoadByFamily;
// Source delegation token
private final FsDelegationToken fsDelegationToken;
@@ -146,8 +152,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
- nrThreads =
- conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
+ nrThreads = conf.getInt("hbase.loadincremental.threads.max",
+ Runtime.getRuntime().availableProcessors());
+ bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false);
}
// Initialize a thread pool
@@ -363,6 +370,54 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
}
/**
+ * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
+ * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To
+ * maintain row atomicity guarantees, region server side should succeed atomically and fails
+ * atomically.
+ * @return empty list if success, list of items to retry on recoverable failure
+ */
+ private CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
+ final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
+ final byte[] first, Collection<LoadQueueItem> lqis) {
+ List<Pair<byte[], String>> familyPaths =
+ lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
+ .collect(Collectors.toList());
+ CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
+ FutureUtils
+ .addListener(
+ conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
+ fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
+ (loaded, error) -> {
+ if (error != null) {
+ LOG.error("Encountered unrecoverable error from region server", error);
+ if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
+ && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
+ LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
+ + numRetries.incrementAndGet());
+ // return lqi's to retry
+ future.complete(lqis);
+ } else {
+ LOG.error(RETRY_ON_IO_EXCEPTION
+ + " is disabled or we have reached retry limit. Unable to recover");
+ future.completeExceptionally(error);
+ }
+ } else {
+ if (loaded) {
+ future.complete(Collections.emptyList());
+ } else {
+ LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
+ + " into table " + tableName + " with files " + lqis
+ + " failed. This is recoverable and they will be retried.");
+ // return lqi's to retry
+ future.complete(lqis);
+ }
+ }
+ });
+ return future;
+ }
+
+ /**
* This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
* re-queued for another pass with the groupOrSplitPhase.
* <p/>
@@ -375,43 +430,15 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
// atomically bulk load the groups.
List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>();
for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap()
- .entrySet()) {
+ .entrySet()) {
byte[] first = entry.getKey().array();
final Collection<LoadQueueItem> lqis = entry.getValue();
- List<Pair<byte[], String>> familyPaths =
- lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
- .collect(Collectors.toList());
- CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
- FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
- fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
- (loaded, error) -> {
- if (error != null) {
- LOG.error("Encountered unrecoverable error from region server", error);
- if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
- numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
- LOG.warn("Will attempt to retry loading failed HFiles. Retry #" +
- numRetries.incrementAndGet());
- // return lqi's to retry
- future.complete(lqis);
- } else {
- LOG.error(RETRY_ON_IO_EXCEPTION +
- " is disabled or we have reached retry limit. Unable to recover");
- future.completeExceptionally(error);
- }
- } else {
- if (loaded) {
- future.complete(Collections.emptyList());
- } else {
- LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) +
- " into table " + tableName + " with files " + lqis +
- " failed. This is recoverable and they will be retried.");
- // return lqi's to retry
- future.complete(lqis);
- }
- }
- });
- loadingFutures.add(future);
+ if (bulkLoadByFamily) {
+ groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures
+ .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue)));
+ } else {
+ loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis));
+ }
if (item2RegionMap != null) {
for (LoadQueueItem lqi : lqis) {
item2RegionMap.put(lqi, entry.getKey());
@@ -447,6 +474,14 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
}
}
+ private Map<byte[], Collection<LoadQueueItem>>
+ groupByFamilies(Collection<LoadQueueItem> itemsInRegion) {
+ Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ itemsInRegion.forEach(item -> families2Queue
+ .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item));
+ return families2Queue;
+ }
+
private boolean checkHFilesCountPerRegionPerFamily(
final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
index 9642b8f..6d0e914 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
@@ -102,6 +102,7 @@ public class TestBulkLoadHFiles {
// change default behavior so that tag values are returned with normal rpcs
util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
KeyValueCodecWithTags.class.getCanonicalName());
+ util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
util.startMiniCluster();
setupNamespace();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java
new file mode 100644
index 0000000..42a81a9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+
+public class TestBulkLoadHFilesByFamily extends TestBulkLoadHFiles {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestBulkLoadHFilesByFamily.class);
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+ util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+ MAX_FILES_PER_REGION_PER_FAMILY);
+ // change default behavior so that tag values are returned with normal rpcs
+ util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+ KeyValueCodecWithTags.class.getCanonicalName());
+ util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
+ util.startMiniCluster();
+ setupNamespace();
+ }
+}