You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/27 03:33:03 UTC
[hbase] branch master updated: HBASE-21782 LoadIncrementalHFiles
should not be IA.Public
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 1995f61 HBASE-21782 LoadIncrementalHFiles should not be IA.Public
1995f61 is described below
commit 1995f61d7fe5f44a679674a887055906652ed3b7
Author: zhangduo <zh...@apache.org>
AuthorDate: Sat Jan 26 19:44:10 2019 +0800
HBASE-21782 LoadIncrementalHFiles should not be IA.Public
Signed-off-by: Michael Stack <st...@apache.org>
---
.../org/apache/hadoop/hbase/mapreduce/Driver.java | 13 ++-
.../apache/hadoop/hbase/tool/BulkLoadHFiles.java | 100 +++++++++++++++++++++
.../hadoop/hbase/tool/BulkLoadHFilesTool.java | 70 +++++++++++++++
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 88 +++++++++---------
.../hbase/tool/TestLoadIncrementalHFiles.java | 9 +-
5 files changed, 224 insertions(+), 56 deletions(-)
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
index afa1ba7..18f1617 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/Driver.java
@@ -19,12 +19,12 @@
package org.apache.hadoop.hbase.mapreduce;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication;
import org.apache.hadoop.hbase.snapshot.ExportSnapshot;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.tool.BulkLoadHFilesTool;
import org.apache.hadoop.util.ProgramDriver;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
/**
* Driver for hbase mapreduce jobs. Select which to run by passing
@@ -33,10 +33,7 @@ import org.apache.hadoop.util.ProgramDriver;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
@InterfaceStability.Stable
public class Driver {
- /**
- * @param args
- * @throws Throwable
- */
+
public static void main(String[] args) throws Throwable {
ProgramDriver pgd = new ProgramDriver();
@@ -47,7 +44,7 @@ public class Driver {
pgd.addClass(Export.NAME, Export.class, "Write table data to HDFS.");
pgd.addClass(Import.NAME, Import.class, "Import data written by Export.");
pgd.addClass(ImportTsv.NAME, ImportTsv.class, "Import data in TSV format.");
- pgd.addClass(LoadIncrementalHFiles.NAME, LoadIncrementalHFiles.class,
+ pgd.addClass(BulkLoadHFilesTool.NAME, BulkLoadHFilesTool.class,
"Complete a bulk data load.");
pgd.addClass(CopyTable.NAME, CopyTable.class,
"Export a table from local cluster to peer cluster.");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
new file mode 100644
index 0000000..f3d627a
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFiles.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The tool to let you load the output of {@code HFileOutputFormat} into an existing table
+ * programmatically. Not thread safe.
+ */
+@InterfaceAudience.Public
+public interface BulkLoadHFiles {
+
+ static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
+ static final String MAX_FILES_PER_REGION_PER_FAMILY =
+ "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
+ static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
+ static final String CREATE_TABLE_CONF_KEY = "create.table";
+ static final String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
+ static final String ALWAYS_COPY_FILES = "always.copy.files";
+
+ /**
+ * Represents an HFile waiting to be loaded. An queue is used in this class in order to support
+ * the case where a region has split during the process of the load. When this happens, the HFile
+ * is split into two physical parts across the new region boundary, and each part is added back
+ * into the queue. The import process finishes when the queue is empty.
+ */
+ @InterfaceAudience.Public
+ public static class LoadQueueItem {
+
+ private final byte[] family;
+
+ private final Path hfilePath;
+
+ public LoadQueueItem(byte[] family, Path hfilePath) {
+ this.family = family;
+ this.hfilePath = hfilePath;
+ }
+
+ @Override
+ public String toString() {
+ return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString();
+ }
+
+ public byte[] getFamily() {
+ return family;
+ }
+
+ public Path getFilePath() {
+ return hfilePath;
+ }
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given pre-existing table.
+ * @param tableName the table to load into
+ * @param family2Files map of family to List of hfiles
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Map<byte[], List<Path>> family2Files)
+ throws TableNotFoundException, IOException;
+
+ /**
+ * Perform a bulk load of the given directory into the given pre-existing table.
+ * @param tableName the table to load into
+ * @param dir the directory that was provided as the output path of a job using
+ * {@code HFileOutputFormat}
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ Map<LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
+ throws TableNotFoundException, IOException;
+
+ static BulkLoadHFiles create(Configuration conf) {
+ return new BulkLoadHFilesTool(conf);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
new file mode 100644
index 0000000..795bd66
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/BulkLoadHFilesTool.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.tool;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * The implementation for {@link BulkLoadHFiles}, and also can be executed from command line as a
+ * tool.
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public class BulkLoadHFilesTool extends LoadIncrementalHFiles implements BulkLoadHFiles {
+
+ public static final String NAME = "completebulkload";
+
+ public BulkLoadHFilesTool(Configuration conf) {
+ super(conf);
+ }
+
+ private Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> convert(
+ Map<LoadIncrementalHFiles.LoadQueueItem, ByteBuffer> map) {
+ return map.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
+ }
+
+ @Override
+ public Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName,
+ Map<byte[], List<Path>> family2Files) throws TableNotFoundException, IOException {
+ return convert(run(family2Files, tableName));
+ }
+
+ @Override
+ public Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> bulkLoad(TableName tableName, Path dir)
+ throws TableNotFoundException, IOException {
+ return convert(run(dir, tableName));
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ int ret = ToolRunner.run(conf, new BulkLoadHFilesTool(conf), args);
+ System.exit(ret);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index 3320b1f..314f2cb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.tool;
+import static java.lang.String.format;
+
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -46,7 +48,6 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
-import static java.lang.String.format;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
@@ -87,13 +88,6 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
-import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
-import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
import org.apache.hadoop.hbase.util.FSVisitor;
@@ -104,22 +98,41 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.collect.HashMultimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimap;
+import org.apache.hbase.thirdparty.com.google.common.collect.Multimaps;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* Tool to load the output of HFileOutputFormat into an existing table.
+ * @deprecated since 2.2.0, will be removed in 3.0.0. Use {@link BulkLoadHFiles} instead. Please
+ * rewrite your code if you rely on methods other than the {@link #run(Map, TableName)}
+ * and {@link #run(String, TableName)}, as all the methods other than them will be
+ * removed with no replacement.
*/
+@Deprecated
@InterfaceAudience.Public
public class LoadIncrementalHFiles extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(LoadIncrementalHFiles.class);
- public static final String NAME = "completebulkload";
- static final String RETRY_ON_IO_EXCEPTION = "hbase.bulkload.retries.retryOnIOException";
+ /**
+ * @deprecated since 2.2.0, will be removed in 3.0.0, with no replacement. End user should not
+ * depend on this value.
+ */
+ @Deprecated
+ public static final String NAME = BulkLoadHFilesTool.NAME;
+ static final String RETRY_ON_IO_EXCEPTION = BulkLoadHFiles.RETRY_ON_IO_EXCEPTION;
public static final String MAX_FILES_PER_REGION_PER_FAMILY =
- "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
- private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
- public final static String CREATE_TABLE_CONF_KEY = "create.table";
- public final static String IGNORE_UNMATCHED_CF_CONF_KEY = "ignore.unmatched.families";
- public final static String ALWAYS_COPY_FILES = "always.copy.files";
+ BulkLoadHFiles.MAX_FILES_PER_REGION_PER_FAMILY;
+ private static final String ASSIGN_SEQ_IDS = BulkLoadHFiles.ASSIGN_SEQ_IDS;
+ public final static String CREATE_TABLE_CONF_KEY = BulkLoadHFiles.CREATE_TABLE_CONF_KEY;
+ public final static String IGNORE_UNMATCHED_CF_CONF_KEY =
+ BulkLoadHFiles.IGNORE_UNMATCHED_CF_CONF_KEY;
+ public final static String ALWAYS_COPY_FILES = BulkLoadHFiles.ALWAYS_COPY_FILES;
// We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name.
@@ -142,28 +155,14 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* the case where a region has split during the process of the load. When this happens, the HFile
* is split into two physical parts across the new region boundary, and each part is added back
* into the queue. The import process finishes when the queue is empty.
+ * @deprecated Use {@link BulkLoadHFiles} instead.
*/
@InterfaceAudience.Public
- public static class LoadQueueItem {
- private final byte[] family;
- private final Path hfilePath;
+ @Deprecated
+ public static class LoadQueueItem extends BulkLoadHFiles.LoadQueueItem {
public LoadQueueItem(byte[] family, Path hfilePath) {
- this.family = family;
- this.hfilePath = hfilePath;
- }
-
- @Override
- public String toString() {
- return "family:" + Bytes.toString(family) + " path:" + hfilePath.toString();
- }
-
- public byte[] getFamily() {
- return family;
- }
-
- public Path getFilePath() {
- return hfilePath;
+ super(family, hfilePath);
}
}
@@ -825,8 +824,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
* If the table is created for the first time, then "completebulkload" reads the files twice. More
* modifications necessary if we want to avoid doing it.
*/
- private void createTable(TableName tableName, String dirPath, Admin admin) throws IOException {
- final Path hfofDir = new Path(dirPath);
+ private void createTable(TableName tableName, Path hfofDir, Admin admin) throws IOException {
final FileSystem fs = hfofDir.getFileSystem(getConf());
// Add column families
@@ -1148,13 +1146,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
return getConf().getBoolean(ALWAYS_COPY_FILES, false);
}
- /**
- * Perform bulk load on the given table.
- * @param hfofDir the directory that was provided as the output path of a job using
- * HFileOutputFormat
- * @param tableName the table to load into
- */
- public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
+ protected final Map<LoadQueueItem, ByteBuffer> run(Path hfofDir, TableName tableName)
throws IOException {
try (Connection connection = ConnectionFactory.createConnection(getConf());
Admin admin = connection.getAdmin()) {
@@ -1169,11 +1161,21 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
- return doBulkLoad(new Path(hfofDir), admin, table, locator, isSilence(),
+ return doBulkLoad(hfofDir, admin, table, locator, isSilence(),
isAlwaysCopyFiles());
}
}
}
+ /**
+ * Perform bulk load on the given table.
+ * @param hfofDir the directory that was provided as the output path of a job using
+ * HFileOutputFormat
+ * @param tableName the table to load into
+ */
+ public Map<LoadQueueItem, ByteBuffer> run(String hfofDir, TableName tableName)
+ throws IOException {
+ return run(new Path(hfofDir), tableName);
+ }
/**
* Perform bulk load on the given table.
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
index 85235b6..129823e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestLoadIncrementalHFiles.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
-import org.apache.hadoop.hbase.tool.LoadIncrementalHFiles.LoadQueueItem;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.HFileTestUtil;
@@ -346,7 +345,7 @@ public class TestLoadIncrementalHFiles {
if (copyFiles) {
conf.setBoolean(LoadIncrementalHFiles.ALWAYS_COPY_FILES, true);
}
- LoadIncrementalHFiles loader = new LoadIncrementalHFiles(conf);
+ BulkLoadHFilesTool loader = new BulkLoadHFilesTool(conf);
List<String> args = Lists.newArrayList(baseDirectory.toString(), tableName.toString());
if (depth == 3) {
args.add("-loadTable");
@@ -356,17 +355,17 @@ public class TestLoadIncrementalHFiles {
if (deleteFile) {
fs.delete(last, true);
}
- Map<LoadQueueItem, ByteBuffer> loaded = loader.run(map, tableName);
+ Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> loaded = loader.bulkLoad(tableName, map);
if (deleteFile) {
expectedRows -= 1000;
- for (LoadQueueItem item : loaded.keySet()) {
+ for (BulkLoadHFiles.LoadQueueItem item : loaded.keySet()) {
if (item.getFilePath().getName().equals(last.getName())) {
fail(last + " should be missing");
}
}
}
} else {
- loader.run(args.toArray(new String[]{}));
+ loader.run(args.toArray(new String[] {}));
}
if (copyFiles) {