You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2018/01/12 21:35:53 UTC

hbase git commit: HBASE-11409 - Add more flexibility for input directory structure to LoadIncrementalHFiles

Repository: hbase
Updated Branches:
  refs/heads/branch-1 6f29a39d7 -> 48025cc84


HBASE-11409 - Add more flexibility for input directory structure to LoadIncrementalHFiles


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/48025cc8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/48025cc8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/48025cc8

Branch: refs/heads/branch-1
Commit: 48025cc84b791c7d64e7c01c59a06a4e16897459
Parents: 6f29a39
Author: Rahul Gidwani <ch...@apache.org>
Authored: Fri Jan 12 13:35:26 2018 -0800
Committer: Rahul Gidwani <ch...@apache.org>
Committed: Fri Jan 12 13:35:26 2018 -0800

----------------------------------------------------------------------
 .../hbase/mapreduce/LoadIncrementalHFiles.java  | 50 +++++++++----
 .../mapreduce/TestLoadIncrementalHFiles.java    | 74 ++++++++++++--------
 2 files changed, 79 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/48025cc8/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 853b59d..9d7d80b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.mapreduce;
 
+import com.google.common.annotations.VisibleForTesting;
 import static java.lang.String.format;
 
 import java.io.FileNotFoundException;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
 import org.apache.hadoop.hbase.security.token.FsDelegationToken;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
@@ -135,6 +137,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private String bulkToken;
   private UserProvider userProvider;
   private int nrThreads;
+  private int depth = 2;
 
   private LoadIncrementalHFiles() {}
 
@@ -143,6 +146,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
     initialize();
   }
 
+  public void setDepth(int depth) {
+    this.depth = depth;
+  }
+
   private void initialize() throws Exception {
     if (hbAdmin == null) {
       // make a copy, just to be sure we're not overriding someone else's config
@@ -161,9 +168,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   }
 
   private void usage() {
-    System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
-        + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
-        + "  Note: if you set this to 'no', then the target table must already exist in HBase\n"
+    System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable"
+        + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by "
+        + "this tool\n  Note: if you set this to 'no', then the target table must already exist "
+        + "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you"
+        + " must have an existing table"
         + "\n");
   }
 
@@ -287,22 +296,32 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
   private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
     final boolean validateHFile) throws IOException {
     fs = hfofDir.getFileSystem(getConf());
-    visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
-      @Override
-      public byte[] bulkFamily(final byte[] familyName) {
+    BulkHFileVisitor<byte[]> visitor = new BulkHFileVisitor<byte[]>() {
+      @Override public byte[] bulkFamily(final byte[] familyName) {
         return familyName;
       }
-      @Override
-      public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
+
+      @Override public void bulkHFile(final byte[] family, final FileStatus hfile)
+          throws IOException {
         long length = hfile.getLen();
-        if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
-            HConstants.DEFAULT_MAX_FILE_SIZE)) {
-          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
-              length + " bytes can be problematic as it may lead to oversplitting.");
+        if (length > getConf()
+            .getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)) {
+          LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length
+              + " bytes can be problematic as it may lead to oversplitting.");
         }
         ret.add(new LoadQueueItem(family, hfile.getPath()));
       }
-    }, validateHFile);
+    };
+    if (depth == 2) {
+      visitBulkHFiles(fs, hfofDir, visitor, validateHFile);
+    } else if (depth == 3) {
+      for (FileStatus fileStatus : FSUtils.listStatus(fs, hfofDir)) {
+        visitBulkHFiles(fs, fileStatus.getPath(), visitor, validateHFile);
+      }
+    } else {
+      throw new IllegalArgumentException("Depth of HFiles from directory must be 2 or 3");
+    }
+
   }
 
   /**
@@ -1096,7 +1115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
   @Override
   public int run(String[] args) throws Exception {
-    if (args.length != 2) {
+    if (args.length != 2 && args.length != 3) {
       usage();
       return -1;
     }
@@ -1105,6 +1124,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
 
     String dirPath = args[0];
     TableName tableName = TableName.valueOf(args[1]);
+    if (args.length == 3) {
+      this.setDepth(3);
+    }
 
     boolean tableExists = this.doesTableExist(tableName);
     if (!tableExists) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/48025cc8/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index e200442..13a2274 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -22,11 +22,9 @@ import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
-
 import java.io.IOException;
 import java.util.Locale;
 import java.util.TreeMap;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -36,11 +34,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
@@ -118,7 +116,7 @@ public class TestLoadIncrementalHFiles {
         new byte[][][] {
           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
           new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
-    });
+        }, 2);
   }
 
   /**
@@ -131,7 +129,7 @@ public class TestLoadIncrementalHFiles {
         new byte[][][] {
           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
-    });
+        }, 2);
   }
 
   /**
@@ -143,7 +141,7 @@ public class TestLoadIncrementalHFiles {
         new byte[][][] {
           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
-    });
+        }, 2);
   }
 
   /**
@@ -155,7 +153,7 @@ public class TestLoadIncrementalHFiles {
         new byte[][][] {
           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
-    });
+        }, 2);
   }
 
   /**
@@ -172,8 +170,7 @@ public class TestLoadIncrementalHFiles {
         new byte[][][] {
           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
           new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") },
-        }
-    );
+        }, 2);
   }
 
   /**
@@ -221,8 +218,7 @@ public class TestLoadIncrementalHFiles {
       },
       new byte[][][] {
         new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") },
-      }
-    );
+      }, 2);
   }
 
   private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
@@ -234,8 +230,7 @@ public class TestLoadIncrementalHFiles {
         new byte[][][] {
           new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
           new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
-        }
-    );
+        }, 2);
   }
 
   private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
@@ -246,39 +241,56 @@ public class TestLoadIncrementalHFiles {
     return htd;
   }
 
-  private void runTest(String testName, BloomType bloomType,
-      byte[][][] hfileRanges) throws Exception {
-    runTest(testName, bloomType, null, hfileRanges);
+  private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges,
+      int depth) throws Exception {
+    runTest(testName, bloomType, null, hfileRanges, depth);
   }
 
-  private void runTest(String testName, BloomType bloomType,
-      byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
+  private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
+      byte[][][] hfileRanges, int depth) throws Exception {
     final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
     final boolean preCreateTable = tableSplitKeys != null;
 
     // Run the test bulkloading the table to the default namespace
     final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
-    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
+    runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 2);
+
+    /* Run the test bulkloading the table from a depth of 3
+       directory structure is now
+        baseDirectory
+          -- regionDir
+            -- familyDir
+              -- storeFileDir
+    */
+    if (preCreateTable) {
+      runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, 3);
+    }
 
     // Run the test bulkloading the table to the specified namespace
     final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
-    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
+    runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, depth);
   }
 
   private void runTest(String testName, TableName tableName, BloomType bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, int depth)
+      throws Exception {
     HTableDescriptor htd = buildHTD(tableName, bloomType);
-    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
+    runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, depth);
   }
 
   private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
-      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
+      boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, int depth)
+      throws Exception {
 
     for (boolean managed : new boolean[] { true, false }) {
-      Path dir = util.getDataTestDirOnTestFS(testName);
+      Path baseDirectory = util.getDataTestDirOnTestFS(testName);
       FileSystem fs = util.getTestFileSystem();
-      dir = dir.makeQualified(fs);
-      Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+      baseDirectory = baseDirectory.makeQualified(fs);
+      Path parentDir = baseDirectory;
+      if (depth == 3) {
+        parentDir = new Path(baseDirectory, "someRegion");
+      }
+      Path familyDir = new Path(parentDir, Bytes.toString(FAMILY));
 
       int hfileIdx = 0;
       for (byte[][] range : hfileRanges) {
@@ -298,16 +310,16 @@ public class TestLoadIncrementalHFiles {
         util.getHBaseAdmin().createTable(htd);
       }
       LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
-
+      loader.setDepth(depth);
       if (managed) {
         try (HTable table = new HTable(util.getConfiguration(), tableName)) {
-          loader.doBulkLoad(dir, table);
+          loader.doBulkLoad(baseDirectory, table);
           assertEquals(expectedRows, util.countRows(table));
         }
       } else {
         try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
             HTable table = (HTable) conn.getTable(tableName)) {
-          loader.doBulkLoad(dir, table);
+          loader.doBulkLoad(baseDirectory, table);
         }
       }
 
@@ -390,7 +402,7 @@ public class TestLoadIncrementalHFiles {
     htd.addFamily(family);
 
     try {
-      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges);
+      runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, 2);
       assertTrue("Loading into table with non-existent family should have failed", false);
     } catch (Exception e) {
       assertTrue("IOException expected", e instanceof IOException);