You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/08/30 10:02:16 UTC
hbase git commit: HBASE-16509 Add option to LoadIncrementalHFiles
which allows skipping unmatched column families
Repository: hbase
Updated Branches:
refs/heads/master 0d05c7518 -> 9cb0094bd
HBASE-16509 Add option to LoadIncrementalHFiles which allows skipping unmatched column families
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9cb0094b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9cb0094b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9cb0094b
Branch: refs/heads/master
Commit: 9cb0094bdbecf95455388278430833188bf021ef
Parents: 0d05c75
Author: tedyu <yu...@gmail.com>
Authored: Tue Aug 30 03:00:59 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Aug 30 03:00:59 2016 -0700
----------------------------------------------------------------------
.../hbase/mapreduce/LoadIncrementalHFiles.java | 62 ++++++++++++++++----
1 file changed, 51 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9cb0094b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index a34dc0a..6978e23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -114,6 +114,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
= "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
public final static String CREATE_TABLE_CONF_KEY = "create.table";
+ public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
// We use a '.' prefix which is ignored when walking directory trees
// above. It is invalid family name.
@@ -121,6 +122,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private int maxFilesPerRegionPerFamily;
private boolean assignSeqIds;
+ private Set<String> unmatchedFamilies = new HashSet<String>();
// Source filesystem
private FileSystem fs;
@@ -160,7 +162,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private void usage() {
System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
+ CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
- + " Note: if you set this to 'no', then the target table must already exist in HBase\n"
+ + " Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
+ + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
+ "\n");
}
@@ -308,11 +311,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*
* @param hfofDir the directory that was provided as the output path
* of a job using HFileOutputFormat
+ * @param admin the Admin
* @param table the table to load into
+ * @param regionLocator region locator
* @throws TableNotFoundException if table does not yet exist
*/
public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
- RegionLocator regionLocator) throws TableNotFoundException, IOException {
+ RegionLocator regionLocator) throws TableNotFoundException, IOException {
+ doBulkLoad(hfofDir, admin, table, regionLocator, false);
+ }
+
+ /**
+ * Perform a bulk load of the given directory into the given
+ * pre-existing table. This method is not threadsafe.
+ *
+ * @param hfofDir the directory that was provided as the output path
+ * of a job using HFileOutputFormat
+ * @param admin the Admin
+ * @param table the table to load into
+ * @param regionLocator region locator
+ * @param silence true to ignore unmatched column families
+ * @throws TableNotFoundException if table does not yet exist
+ */
+ public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
+ RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
if (!admin.isTableAvailable(regionLocator.getName())) {
throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
@@ -337,7 +359,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
"option, consider removing the files and bulkload again without this option. " +
"See HBASE-13985");
}
- prepareHFileQueue(hfofDir, table, queue, validateHFile);
+ prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
int count = 0;
@@ -429,8 +451,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
*/
public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
boolean validateHFile) throws IOException {
+ prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
+ }
+
+ /**
+ * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+ * passed directory and validates whether the prepared queue has all the valid table column
+ * families in it.
+ * @param hfilesDir directory containing list of hfiles to be loaded into the table
+ * @param table table to which hfiles should be loaded
+ * @param queue queue which needs to be loaded into the table
+ * @param validateHFile if true hfiles will be validated for its format
+ * @param silence true to ignore unmatched column families
+ * @throws IOException If any I/O or network error occurred
+ */
+ public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
+ boolean validateHFile, boolean silence) throws IOException {
discoverLoadQueue(queue, hfilesDir, validateHFile);
- validateFamiliesInHFiles(table, queue);
+ validateFamiliesInHFiles(table, queue, silence);
}
// Initialize a thread pool
@@ -446,14 +484,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
/**
* Checks whether there is any invalid family name in HFiles to be bulk loaded.
*/
- private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
+ private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
throws IOException {
Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
List<String> familyNames = new ArrayList<>(families.size());
for (HColumnDescriptor family : families) {
familyNames.add(family.getNameAsString());
}
- List<String> unmatchedFamilies = new ArrayList<String>();
Iterator<LoadQueueItem> queueIter = queue.iterator();
while (queueIter.hasNext()) {
LoadQueueItem lqi = queueIter.next();
@@ -468,7 +505,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
+ unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
+ familyNames;
LOG.error(msg);
- throw new IOException(msg);
+ if (!silence) throw new IOException(msg);
}
}
@@ -781,7 +818,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
throws IOException {
final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
for (LoadQueueItem lqi : lqis) {
- famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+ if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
+ famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+ }
}
final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
rpcControllerFactory, tableName, first) {
@@ -1036,7 +1075,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- if (args.length != 2) {
+ if (args.length < 2) {
usage();
return -1;
}
@@ -1062,7 +1101,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
try (Table table = connection.getTable(tableName);
RegionLocator locator = connection.getRegionLocator(tableName)) {
- doBulkLoad(hfofDir, admin, table, locator);
+ boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
+ doBulkLoad(hfofDir, admin, table, locator, silence);
}
}
@@ -1087,4 +1127,4 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
public void setBulkToken(String stagingDir) {
this.bulkToken = stagingDir;
}
-}
\ No newline at end of file
+}