You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/08/30 10:02:16 UTC

hbase git commit: HBASE-16509 Add option to LoadIncrementalHFiles which allows skipping unmatched column families

Repository: hbase
Updated Branches:
  refs/heads/master 0d05c7518 -> 9cb0094bd


HBASE-16509 Add option to LoadIncrementalHFiles which allows skipping unmatched column families


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9cb0094b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9cb0094b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9cb0094b

Branch: refs/heads/master
Commit: 9cb0094bdbecf95455388278430833188bf021ef
Parents: 0d05c75
Author: tedyu <yu...@gmail.com>
Authored: Tue Aug 30 03:00:59 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Aug 30 03:00:59 2016 -0700

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 62 ++++++++++++++++----
 1 file changed, 51 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9cb0094b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index a34dc0a..6978e23 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -114,6 +114,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     = "hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily";
   private static final String ASSIGN_SEQ_IDS = "hbase.mapreduce.bulkload.assign.sequenceNumbers";
   public final static String CREATE_TABLE_CONF_KEY = "create.table";
+  public final static String SILENCE_CONF_KEY = "ignore.unmatched.families";
 
   // We use a '.' prefix which is ignored when walking directory trees
   // above. It is invalid family name.
@@ -121,6 +122,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   private int maxFilesPerRegionPerFamily;
   private boolean assignSeqIds;
+  private Set<String> unmatchedFamilies = new HashSet<String>();
 
   // Source filesystem
   private FileSystem fs;
@@ -160,7 +162,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private void usage() {
     System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
         + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
-        + "  Note: if you set this to 'no', then the target table must already exist in HBase\n"
+        + "  Note: if you set this to 'no', then the target table must already exist in HBase\n -D"
+        + SILENCE_CONF_KEY + "=yes - can be used to ignore unmatched column families\n"
         + "\n");
   }
 
@@ -308,11 +311,30 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    *
    * @param hfofDir the directory that was provided as the output path
    *   of a job using HFileOutputFormat
+   * @param admin the Admin
    * @param table the table to load into
+   * @param regionLocator region locator
    * @throws TableNotFoundException if table does not yet exist
    */
   public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
-      RegionLocator regionLocator) throws TableNotFoundException, IOException  {
+      RegionLocator regionLocator) throws TableNotFoundException, IOException {
+    doBulkLoad(hfofDir, admin, table, regionLocator, false);
+  }
+
+  /**
+   * Perform a bulk load of the given directory into the given
+   * pre-existing table.  This method is not threadsafe.
+   *
+   * @param hfofDir the directory that was provided as the output path
+   *   of a job using HFileOutputFormat
+   * @param admin the Admin
+   * @param table the table to load into
+   * @param regionLocator region locator
+   * @param silence true to ignore unmatched column families
+   * @throws TableNotFoundException if table does not yet exist
+   */
+  public void doBulkLoad(Path hfofDir, final Admin admin, Table table,
+      RegionLocator regionLocator, boolean silence) throws TableNotFoundException, IOException {
 
     if (!admin.isTableAvailable(regionLocator.getName())) {
       throw new TableNotFoundException("Table " + table.getName() + " is not currently available.");
@@ -337,7 +359,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 	    "option, consider removing the files and bulkload again without this option. " +
 	    "See HBASE-13985");
       }
-      prepareHFileQueue(hfofDir, table, queue, validateHFile);
+      prepareHFileQueue(hfofDir, table, queue, validateHFile, silence);
 
       int count = 0;
 
@@ -429,8 +451,24 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
    */
   public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
       boolean validateHFile) throws IOException {
+    prepareHFileQueue(hfilesDir, table, queue, validateHFile, false);
+  }
+
+  /**
+   * Prepare a collection of {@link LoadQueueItem} from list of source hfiles contained in the
+   * passed directory and validates whether the prepared queue has all the valid table column
+   * families in it.
+   * @param hfilesDir directory containing list of hfiles to be loaded into the table
+   * @param table table to which hfiles should be loaded
+   * @param queue queue which needs to be loaded into the table
+   * @param validateHFile if true hfiles will be validated for its format
+   * @param silence  true to ignore unmatched column families
+   * @throws IOException If any I/O or network error occurred
+   */
+  public void prepareHFileQueue(Path hfilesDir, Table table, Deque<LoadQueueItem> queue,
+      boolean validateHFile, boolean silence) throws IOException {
     discoverLoadQueue(queue, hfilesDir, validateHFile);
-    validateFamiliesInHFiles(table, queue);
+    validateFamiliesInHFiles(table, queue, silence);
   }
 
   // Initialize a thread pool
@@ -446,14 +484,13 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   /**
    * Checks whether there is any invalid family name in HFiles to be bulk loaded.
    */
-  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue)
+  private void validateFamiliesInHFiles(Table table, Deque<LoadQueueItem> queue, boolean silence)
       throws IOException {
     Collection<HColumnDescriptor> families = table.getTableDescriptor().getFamilies();
     List<String> familyNames = new ArrayList<>(families.size());
     for (HColumnDescriptor family : families) {
       familyNames.add(family.getNameAsString());
     }
-    List<String> unmatchedFamilies = new ArrayList<String>();
     Iterator<LoadQueueItem> queueIter = queue.iterator();
     while (queueIter.hasNext()) {
       LoadQueueItem lqi = queueIter.next();
@@ -468,7 +505,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
               + unmatchedFamilies + "; valid family names of table " + table.getName() + " are: "
               + familyNames;
       LOG.error(msg);
-      throw new IOException(msg);
+      if (!silence) throw new IOException(msg);
     }
   }
 
@@ -781,7 +818,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   throws IOException {
     final List<Pair<byte[], String>> famPaths = new ArrayList<>(lqis.size());
     for (LoadQueueItem lqi : lqis) {
-      famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+      if (!unmatchedFamilies.contains(Bytes.toString(lqi.family))) {
+        famPaths.add(Pair.newPair(lqi.family, lqi.hfilePath.toString()));
+      }
     }
     final RegionServerCallable<Boolean> svrCallable = new RegionServerCallable<Boolean>(conn,
         rpcControllerFactory, tableName, first) {
@@ -1036,7 +1075,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   @Override
   public int run(String[] args) throws Exception {
-    if (args.length != 2) {
+    if (args.length < 2) {
       usage();
       return -1;
     }
@@ -1062,7 +1101,8 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
       try (Table table = connection.getTable(tableName);
         RegionLocator locator = connection.getRegionLocator(tableName)) {
-        doBulkLoad(hfofDir, admin, table, locator);
+        boolean silence = "yes".equalsIgnoreCase(getConf().get(SILENCE_CONF_KEY, ""));
+        doBulkLoad(hfofDir, admin, table, locator, silence);
       }
     }
 
@@ -1087,4 +1127,4 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   public void setBulkToken(String stagingDir) {
     this.bulkToken = stagingDir;
   }
-}
\ No newline at end of file
+}