You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/27 03:33:03 UTC

[hbase] branch master updated: HBASE-21782 LoadIncrementalHFiles should not be IA.Public

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 1995f61  HBASE-21782 LoadIncrementalHFiles should not be IA.Public
1995f61 is described below

commit 1995f61d7fe5f44a679674a887055906652ed3b7
Author: zhangduo <zh...@apache.org>
AuthorDate: Sat Jan 26 19:44:10 2019 +0800

    HBASE-21782 LoadIncrementalHFiles should not be IA.Public
    
    Signed-off-by: Michael Stack <st...@apache.org>
---
 .../org/apache/hadoop/hbase/mapreduce/Driver.java  |  13 ++-
 .../apache/hadoop/hbase/tool/BulkLoadHFiles.java   | 100 +++++++++++++++++++++
 .../hadoop/hbase/tool/BulkLoadHFilesTool.java      |  70 +++++++++++++++
 .../hadoop/hbase/tool/LoadIncrementalHFiles.java   |  88 +++++++++---------
 .../hbase/tool/TestLoadIncrementalHFiles.java      |   9 +-
 5 files changed, 224 insertions(+), 56 deletions(-)

diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
index afa1ba7..18f1617 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -19,12 +19,12 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
 import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
 import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
 import org.apache.hadoop.util.ProgramDriver;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
 
 /**
  * Driver for hbase mapreduce jobs. Select which to run by passing
@@ -33,10 +33,7 @@ import org.apache.hadoop.util.ProgramDriver;
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
 @InterfaceStability.Stable
 public class Driver {
-  /**
-   * @param args
-   * @throws Throwable
-   */
+
   public static void main(String[] args) throws Throwable {
     ProgramDriver pgd = new ProgramDriver();
 
@@ -47,7 +44,7 @@ public class Driver {
     pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
     pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
     pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
-    pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class,
+    pgd.addClass(BulkLoadHFilesTool.NAME, BulkLoadHFilesTool.class,
                  "Complete a bulk data load.");
     pgd.addClass(CopyTable.NAME, CopyTable.class,
         "Export a table from local cluster to peer cluster.");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
new file mode 100644
index 0000000..f3d627a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The tool to let you load the output of {@code HFileOutputFormat} into an existing table
+ * programmatically. Not thread safe.
+ */
+@InterfaceAudience.Public
+public interface BulkLoadHFiles {
+
+  static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
+  static final String MAX_FILES_PER_REGION_PER_FAMILY =
+    "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
+  static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+  static final String CREATE_TABLE_CONF_KEY = "create.table";
+  static final String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
+  static final String ALWAYS_COPY_FILES = "always.copy.files";
+
+  /**
+   * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
+   * the case where a region has split during the process of the load. When this happens, the HFile
+   * is split into two physical parts across the new region boundary, and each part is added back
+   * into the queue. The import process finishes when the queue is empty.
+   */
+  @InterfaceAudience.Public
+  public static class LoadQueueItem {
+
+    private final byte[] family;
+
+    private final Path hfilePath;
+
+    public LoadQueueItem(byte[] family, Path hfilePath) {
+      this.family = family;
+      this.hfilePath = hfilePath;
+    }
+
+    @Override
+    public String toString() {
+      return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString();
+    }
+
+    public byte[] getFamily() {
+      return family;
+    }
+
+    public Path getFilePath() {
+      return hfilePath;
+    }
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given pre-existing table.
+   * @param tableName the table to load into
+   * @param family2Files map of family to List of hfiles
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Map<byte[], List<Path>> family2Files)
+      throws TableNotFoundException, IOException;
+
+  /**
+   * Perform a bulk load of the given directory into the given pre-existing table.
+   * @param tableName the table to load into
+   * @param dir the directory that was provided as the output path of a job using
+   *          {@code HFileOutputFormat}
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
+      throws TableNotFoundException, IOException;
+
+  static BulkLoadHFiles create(Configuration conf) {
+    return new BulkLoadHFilesTool(conf);
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
new file mode 100644
index 0000000..795bd66
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a
+ * tool.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class BulkLoadHFilesTool extends LoadIncrementalHFiles implements BulkLoadHFiles {
+
+  public static final String NAME = "completebulkload";
+
+  public BulkLoadHFilesTool(Configuration conf) {
+    super(conf);
+  }
+
+  private Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> convert(
+      Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> map) {
+    return map.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+  }
+
+  @Override
+  public Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
+      Map<byte[], List<Path>> family2Files) throws TableNotFoundException, IOException {
+    return convert(run(family2Files, tableName));
+  }
+
+  @Override
+  public Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
+      throws TableNotFoundException, IOException {
+    return convert(run(dir, tableName));
+  }
+
+  public static void main(String[] args) throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
+    System.exit(ret);
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index 3320b1f..314f2cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.tool;
 
+import static java.lang.String.format;
+
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -46,7 +48,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
-import static java.lang.String.format;
 import org.apache.commons.lang3.mutable.MutableInt;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
@@ -87,13 +88,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
 import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
 import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSHDFSUtils;
 import org.apache.hadoop.hbase.util.FSVisitor;
@@ -104,22 +98,41 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Tool to load the output of HFileOutputFormat into an existing table.
+ * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please
+ *             rewrite your code if you rely on methods other than the {@link #run(Map, TableName)}
+ *             and {@link #run(String, TableName)}, as all the methods other than them will be
+ *             removed with no replacement.
  */
+@Deprecated
 @InterfaceAudience.Public
 public class LoadIncrementalHFiles extends Configured implements Tool {
 
   private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class);
 
-  public static final String NAME = "completebulkload";
-  static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
+  /**
+   * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not
+   *             depend on this value.
+   */
+  @Deprecated
+  public static final String NAME = BulkLoadHFilesTool.NAME;
+  static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION;
   public static final String MAX_FILES_PER_REGION_PER_FAMILY =
-      "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
-  private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
-  public final static String CREATE_TABLE_CONF_KEY = "create.table";
-  public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
-  public final static String ALWAYS_COPY_FILES = "always.copy.files";
+    BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY;
+  private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS;
+  public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY;
+  public final static String IGNORE_UNMATCHED_CF_CONF_KEY =
+    BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY;
+  public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES;
 
   // We use a '.' prefix which is ignored when walking directory trees
   // above. It is invalid family name.
@@ -142,28 +155,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * the case where a region has split during the process of the load. When this happens, the HFile
    * is split into two physical parts across the new region boundary, and each part is added back
    * into the queue. The import process finishes when the queue is empty.
+   * @deprecated Use {@link BulkLoadHFiles} instead.
    */
   @InterfaceAudience.Public
-  public static class LoadQueueItem {
-    private final byte[] family;
-    private final Path hfilePath;
+  @Deprecated
+  public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem {
 
     public LoadQueueItem(byte[] family, Path hfilePath) {
-      this.family = family;
-      this.hfilePath = hfilePath;
-    }
-
-    @Override
-    public String toString() {
-      return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString();
-    }
-
-    public byte[] getFamily() {
-      return family;
-    }
-
-    public Path getFilePath() {
-      return hfilePath;
+      super(family, hfilePath);
     }
   }
 
@@ -825,8 +824,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    * If the table is created for the first time, then "completebulkload" reads the files twice. More
    * modifications necessary if we want to avoid doing it.
    */
-  private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
-    final Path hfofDir = new Path(dirPath);
+  private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException {
     final FileSystem fs = hfofDir.getFileSystem(getConf());
 
     // Add column families
@@ -1148,13 +1146,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     return getConf().getBoolean(ALWAYS_COPY_FILES, false);
   }
 
-  /**
-   * Perform bulk load on the given table.
-   * @param hfofDir the directory that was provided as the output path of a job using
-   *          HFileOutputFormat
-   * @param tableName the table to load into
-   */
-  public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
+  protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName)
       throws IOException {
     try (Connection connection = ConnectionFactory.createConnection(getConf());
         Admin admin = connection.getAdmin()) {
@@ -1169,11 +1161,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
       }
       try (Table table = connection.getTable(tableName);
           RegionLocator locator = connection.getRegionLocator(tableName)) {
-        return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(),
+        return doBulkLoad(hfofDir, admin, table, locator, isSilence(),
             isAlwaysCopyFiles());
       }
     }
   }
+  /**
+   * Perform bulk load on the given table.
+   * @param hfofDir the directory that was provided as the output path of a job using
+   *          HFileOutputFormat
+   * @param tableName the table to load into
+   */
+  public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
+      throws IOException {
+    return run(new Path(hfofDir), tableName);
+  }
 
   /**
    * Perform bulk load on the given table.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
index 85235b6..129823e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.regionserver.BloomType;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
@@ -346,7 +345,7 @@ public class TestLoadIncrementalHFiles {
     if (copyFiles) {
       conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
     }
-    LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+    BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
     List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
     if (depth == 3) {
       args.add("-loadTable");
@@ -356,17 +355,17 @@ public class TestLoadIncrementalHFiles {
       if (deleteFile) {
         fs.delete(last, true);
       }
-      Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName);
+      Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map);
       if (deleteFile) {
         expectedRows -= 1000;
-        for (LoadQueueItem item : loaded.keySet()) {
+        for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) {
           if (item.getFilePath().getName().equals(last.getName())) {
             fail(last + " should be missing");
           }
         }
       }
     } else {
-      loader.run(args.toArray(new String[]{}));
+      loader.run(args.toArray(new String[] {}));
     }
 
     if (copyFiles) {