You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/04/30 10:33:50 UTC

[hbase] branch master updated: HBASE-24221 Support bulkLoadHFile by family (#1569)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 512d00e  HBASE-24221 Support bulkLoadHFile by family (#1569)
512d00e is described below

commit 512d00e75db2bc0af52b164c142e9cf992b8e899
Author: niuyulin <ny...@163.com>
AuthorDate: Thu Apr 30 05:33:37 2020 -0500

    HBASE-24221 Support bulkLoadHFile by family (#1569)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../hadoop/hbase/tool/BulkLoadHFilesTool.java      | 109 ++++++++++++++-------
 .../hadoop/hbase/tool/TestBulkLoadHFiles.java      |   1 +
 .../hbase/tool/TestBulkLoadHFilesByFamily.java     |  45 +++++++++
 3 files changed, 118 insertions(+), 37 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
index 2b67489..b42fbbe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -119,6 +119,11 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
    * Whether to run validation on hfiles before loading.
    */
   private static final String VALIDATE_HFILES = "hbase.loadincremental.validate.hfile";
+  /**
+   * HBASE-24221 Support bulkLoadHFile by family to avoid long time waiting of bulkLoadHFile because
+   * of compacting at server side
+   */
+  public static final String BULK_LOAD_HFILES_BY_FAMILY = "hbase.mapreduce.bulkload.by.family";
 
   // We use a '.' prefix which is ignored when walking directory trees
   // above. It is invalid family name.
@@ -126,6 +131,7 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
 
   private final int maxFilesPerRegionPerFamily;
   private final boolean assignSeqIds;
+  private boolean bulkLoadByFamily;
 
   // Source delegation token
   private final FsDelegationToken fsDelegationToken;
@@ -146,8 +152,9 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
     fsDelegationToken = new FsDelegationToken(userProvider, "renewer");
     assignSeqIds = conf.getBoolean(ASSIGN_SEQ_IDS, true);
     maxFilesPerRegionPerFamily = conf.getInt(MAX_FILES_PER_REGION_PER_FAMILY, 32);
-    nrThreads =
-      conf.getInt("hbase.loadincremental.threads.max", Runtime.getRuntime().availableProcessors());
+    nrThreads = conf.getInt("hbase.loadincremental.threads.max",
+      Runtime.getRuntime().availableProcessors());
+    bulkLoadByFamily = conf.getBoolean(BULK_LOAD_HFILES_BY_FAMILY, false);
   }
 
   // Initialize a thread pool
@@ -363,6 +370,54 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
   }
 
   /**
+   * Attempts to do an atomic load of many hfiles into a region. If it fails, it returns a list of
+   * hfiles that need to be retried. If it is successful it will return an empty list. NOTE: To
+   * maintain row atomicity guarantees, region server side should succeed atomically and fails
+   * atomically.
+   * @return empty list if success, list of items to retry on recoverable failure
+   */
+  private CompletableFuture<Collection<LoadQueueItem>> tryAtomicRegionLoad(
+      final AsyncClusterConnection conn, final TableName tableName, boolean copyFiles,
+      final byte[] first, Collection<LoadQueueItem> lqis) {
+    List<Pair<byte[], String>> familyPaths =
+        lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
+            .collect(Collectors.toList());
+    CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
+    FutureUtils
+        .addListener(
+          conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
+            fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
+          (loaded, error) -> {
+            if (error != null) {
+              LOG.error("Encountered unrecoverable error from region server", error);
+              if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false)
+                  && numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+                    HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
+                LOG.warn("Will attempt to retry loading failed HFiles. Retry #"
+                    + numRetries.incrementAndGet());
+                // return lqi's to retry
+                future.complete(lqis);
+              } else {
+                LOG.error(RETRY_ON_IO_EXCEPTION
+                    + " is disabled or we have reached retry limit. Unable to recover");
+                future.completeExceptionally(error);
+              }
+            } else {
+              if (loaded) {
+                future.complete(Collections.emptyList());
+              } else {
+                LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first)
+                    + " into table " + tableName + " with files " + lqis
+                    + " failed.  This is recoverable and they will be retried.");
+                // return lqi's to retry
+                future.complete(lqis);
+              }
+            }
+          });
+    return future;
+  }
+
+  /**
    * This takes the LQI's grouped by likely regions and attempts to bulk load them. Any failures are
    * re-queued for another pass with the groupOrSplitPhase.
    * <p/>
@@ -375,43 +430,15 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
     // atomically bulk load the groups.
     List<Future<Collection<LoadQueueItem>>> loadingFutures = new ArrayList<>();
     for (Entry<ByteBuffer, ? extends Collection<LoadQueueItem>> entry : regionGroups.asMap()
-      .entrySet()) {
+        .entrySet()) {
       byte[] first = entry.getKey().array();
       final Collection<LoadQueueItem> lqis = entry.getValue();
-      List<Pair<byte[], String>> familyPaths =
-        lqis.stream().map(lqi -> Pair.newPair(lqi.getFamily(), lqi.getFilePath().toString()))
-          .collect(Collectors.toList());
-      CompletableFuture<Collection<LoadQueueItem>> future = new CompletableFuture<>();
-      FutureUtils.addListener(conn.bulkLoad(tableName, familyPaths, first, assignSeqIds,
-        fsDelegationToken.getUserToken(), bulkToken, copyFiles, clusterIds, replicate),
-        (loaded, error) -> {
-          if (error != null) {
-            LOG.error("Encountered unrecoverable error from region server", error);
-            if (getConf().getBoolean(RETRY_ON_IO_EXCEPTION, false) &&
-              numRetries.get() < getConf().getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-                HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER)) {
-              LOG.warn("Will attempt to retry loading failed HFiles. Retry #" +
-                numRetries.incrementAndGet());
-              // return lqi's to retry
-              future.complete(lqis);
-            } else {
-              LOG.error(RETRY_ON_IO_EXCEPTION +
-                " is disabled or we have reached retry limit. Unable to recover");
-              future.completeExceptionally(error);
-            }
-          } else {
-            if (loaded) {
-              future.complete(Collections.emptyList());
-            } else {
-              LOG.warn("Attempt to bulk load region containing " + Bytes.toStringBinary(first) +
-                " into table " + tableName + " with files " + lqis +
-                " failed.  This is recoverable and they will be retried.");
-              // return lqi's to retry
-              future.complete(lqis);
-            }
-          }
-        });
-      loadingFutures.add(future);
+      if (bulkLoadByFamily) {
+        groupByFamilies(lqis).values().forEach(familyQueue -> loadingFutures
+            .add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, familyQueue)));
+      } else {
+        loadingFutures.add(tryAtomicRegionLoad(conn, tableName, copyFiles, first, lqis));
+      }
       if (item2RegionMap != null) {
         for (LoadQueueItem lqi : lqis) {
           item2RegionMap.put(lqi, entry.getKey());
@@ -447,6 +474,14 @@ public class BulkLoadHFilesTool extends Configured implements BulkLoadHFiles, To
     }
   }
 
+  private Map<byte[], Collection<LoadQueueItem>>
+      groupByFamilies(Collection<LoadQueueItem> itemsInRegion) {
+    Map<byte[], Collection<LoadQueueItem>> families2Queue = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    itemsInRegion.forEach(item -> families2Queue
+        .computeIfAbsent(item.getFamily(), queue -> new ArrayList<>()).add(item));
+    return families2Queue;
+  }
+
   private boolean checkHFilesCountPerRegionPerFamily(
       final Multimap<ByteBuffer, LoadQueueItem> regionGroups) {
     for (Map.Entry<ByteBuffer, Collection<LoadQueueItem>> e : regionGroups.asMap().entrySet()) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
index 9642b8f..6d0e914 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFiles.java
@@ -102,6 +102,7 @@ public class TestBulkLoadHFiles {
     // change default behavior so that tag values are returned with normal rpcs
     util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
       KeyValueCodecWithTags.class.getCanonicalName());
+    util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, false);
     util.startMiniCluster();
 
     setupNamespace();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java
new file mode 100644
index 0000000..42a81a9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestBulkLoadHFilesByFamily.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
+import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+
+public class TestBulkLoadHFilesByFamily extends TestBulkLoadHFiles {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestBulkLoadHFilesByFamily.class);
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    util.getConfiguration().set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, "");
+    util.getConfiguration().setInt(BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY,
+      MAX_FILES_PER_REGION_PER_FAMILY);
+    // change default behavior so that tag values are returned with normal rpcs
+    util.getConfiguration().set(HConstants.RPC_CODEC_CONF_KEY,
+      KeyValueCodecWithTags.class.getCanonicalName());
+    util.getConfiguration().setBoolean(BulkLoadHFilesTool.BULK_LOAD_HFILES_BY_FAMILY, true);
+    util.startMiniCluster();
+    setupNamespace();
+  }
+}