You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2018/01/12 21:35:53 UTC
hbase git commit: HBASE-11409 - Add more flexibility for input
directory structure to LoadIncrementalHFiles
Repository: hbase
Updated Branches:
refs/heads/branch-1 6f29a39d7 -> 48025cc84
HBASE-11409 - Add more flexibility for input directory structure to LoadIncrementalHFiles
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/48025cc8
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/48025cc8
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/48025cc8
Branch: refs/heads/branch-1
Commit: 48025cc84b791c7d64e7c01c59a06a4e16897459
Parents: 6f29a39
Author: Rahul Gidwani <ch...@apache.org>
Authored: Fri Jan 12 13:35:26 2018 -0800
Committer: Rahul Gidwani <ch...@apache.org>
Committed: Fri Jan 12 13:35:26 2018 -0800
----------------------------------------------------------------------
.../hbase/mapreduce/LoadIncrementalHFiles.java | 50 +++++++++----
.../mapreduce/TestLoadIncrementalHFiles.java | 74 ++++++++++++--------
2 files changed, 79 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/48025cc8/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
index 853b59d..9d7d80b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.mapreduce;
+import com.google.common.annotations.VisibleForTesting;
import static java.lang.String.format;
import java.io.FileNotFoundException;
@@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.security.access.SecureBulkLoadEndpoint;
import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSHDFSUtils;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@@ -135,6 +137,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private String bulkToken;
private UserProvider userProvider;
private int nrThreads;
+ private int depth = 2;
private LoadIncrementalHFiles() {}
@@ -143,6 +146,10 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
initialize();
}
+ public void setDepth(int depth) {
+ this.depth = depth;
+ }
+
private void initialize() throws Exception {
if (hbAdmin == null) {
// make a copy, just to be sure we're not overriding someone else's config
@@ -161,9 +168,11 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
private void usage() {
- System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename" + "\n -D"
- + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by this tool\n"
- + " Note: if you set this to 'no', then the target table must already exist in HBase\n"
+ System.err.println("usage: " + NAME + " /path/to/hfileoutputformat-output tablename -loadTable"
+ + "\n -D" + CREATE_TABLE_CONF_KEY + "=no - can be used to avoid creation of table by "
+ + "this tool\n Note: if you set this to 'no', then the target table must already exist "
+ + "in HBase\n -loadTable implies your baseDirectory to store file has a depth of 3 ,you"
+ + " must have an existing table"
+ "\n");
}
@@ -287,22 +296,32 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
private void discoverLoadQueue(final Deque<LoadQueueItem> ret, final Path hfofDir,
final boolean validateHFile) throws IOException {
fs = hfofDir.getFileSystem(getConf());
- visitBulkHFiles(fs, hfofDir, new BulkHFileVisitor<byte[]>() {
- @Override
- public byte[] bulkFamily(final byte[] familyName) {
+ BulkHFileVisitor<byte[]> visitor = new BulkHFileVisitor<byte[]>() {
+ @Override public byte[] bulkFamily(final byte[] familyName) {
return familyName;
}
- @Override
- public void bulkHFile(final byte[] family, final FileStatus hfile) throws IOException {
+
+ @Override public void bulkHFile(final byte[] family, final FileStatus hfile)
+ throws IOException {
long length = hfile.getLen();
- if (length > getConf().getLong(HConstants.HREGION_MAX_FILESIZE,
- HConstants.DEFAULT_MAX_FILE_SIZE)) {
- LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " +
- length + " bytes can be problematic as it may lead to oversplitting.");
+ if (length > getConf()
+ .getLong(HConstants.HREGION_MAX_FILESIZE, HConstants.DEFAULT_MAX_FILE_SIZE)) {
+ LOG.warn("Trying to bulk load hfile " + hfile.getPath() + " with size: " + length
+ + " bytes can be problematic as it may lead to oversplitting.");
}
ret.add(new LoadQueueItem(family, hfile.getPath()));
}
- }, validateHFile);
+ };
+ if (depth == 2) {
+ visitBulkHFiles(fs, hfofDir, visitor, validateHFile);
+ } else if (depth == 3) {
+ for (FileStatus fileStatus : FSUtils.listStatus(fs, hfofDir)) {
+ visitBulkHFiles(fs, fileStatus.getPath(), visitor, validateHFile);
+ }
+ } else {
+ throw new IllegalArgumentException("Depth of HFiles from directory must be 2 or 3");
+ }
+
}
/**
@@ -1096,7 +1115,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
- if (args.length != 2) {
+ if (args.length != 2 && args.length != 3) {
usage();
return -1;
}
@@ -1105,6 +1124,9 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]);
+ if (args.length == 3) {
+ this.setDepth(3);
+ }
boolean tableExists = this.doesTableExist(tableName);
if (!tableExists) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/48025cc8/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
index e200442..13a2274 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
@@ -22,11 +22,9 @@ import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-
import java.io.IOException;
import java.util.Locale;
import java.util.TreeMap;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -36,11 +34,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.codec.KeyValueCodecWithTags;
@@ -118,7 +116,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("cccc") },
new byte[][]{ Bytes.toBytes("ddd"), Bytes.toBytes("ooo") },
- });
+ }, 2);
}
/**
@@ -131,7 +129,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
- });
+ }, 2);
}
/**
@@ -143,7 +141,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
- });
+ }, 2);
}
/**
@@ -155,7 +153,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
- });
+ }, 2);
}
/**
@@ -172,8 +170,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("lll") },
new byte[][]{ Bytes.toBytes("mmm"), Bytes.toBytes("zzz") },
- }
- );
+ }, 2);
}
/**
@@ -221,8 +218,7 @@ public class TestLoadIncrementalHFiles {
},
new byte[][][] {
new byte[][] { Bytes.toBytes("aaaa"), Bytes.toBytes("zzz") },
- }
- );
+ }, 2);
}
private void testRegionCrossingHFileSplit(BloomType bloomType) throws Exception {
@@ -234,8 +230,7 @@ public class TestLoadIncrementalHFiles {
new byte[][][] {
new byte[][]{ Bytes.toBytes("aaaa"), Bytes.toBytes("eee") },
new byte[][]{ Bytes.toBytes("fff"), Bytes.toBytes("zzz") },
- }
- );
+ }, 2);
}
private HTableDescriptor buildHTD(TableName tableName, BloomType bloomType) {
@@ -246,39 +241,56 @@ public class TestLoadIncrementalHFiles {
return htd;
}
- private void runTest(String testName, BloomType bloomType,
- byte[][][] hfileRanges) throws Exception {
- runTest(testName, bloomType, null, hfileRanges);
+ private void runTest(String testName, BloomType bloomType, byte[][][] hfileRanges,
+ int depth) throws Exception {
+ runTest(testName, bloomType, null, hfileRanges, depth);
}
- private void runTest(String testName, BloomType bloomType,
- byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
+ private void runTest(String testName, BloomType bloomType, byte[][] tableSplitKeys,
+ byte[][][] hfileRanges, int depth) throws Exception {
final byte[] TABLE_NAME = Bytes.toBytes("mytable_"+testName);
final boolean preCreateTable = tableSplitKeys != null;
// Run the test bulkloading the table to the default namespace
final TableName TABLE_WITHOUT_NS = TableName.valueOf(TABLE_NAME);
- runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
+ runTest(testName, TABLE_WITHOUT_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, 2);
+
+ /* Run the test bulkloading the table from a depth of 3
+ directory structure is now
+ baseDirectory
+ -- regionDir
+ -- familyDir
+ -- storeFileDir
+ */
+ if (preCreateTable) {
+ runTest(testName + 2, TABLE_WITHOUT_NS, bloomType, true, tableSplitKeys, hfileRanges, 3);
+ }
// Run the test bulkloading the table to the specified namespace
final TableName TABLE_WITH_NS = TableName.valueOf(Bytes.toBytes(NAMESPACE), TABLE_NAME);
- runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
+ runTest(testName, TABLE_WITH_NS, bloomType, preCreateTable, tableSplitKeys, hfileRanges, depth);
}
private void runTest(String testName, TableName tableName, BloomType bloomType,
- boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
+ boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, int depth)
+ throws Exception {
HTableDescriptor htd = buildHTD(tableName, bloomType);
- runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges);
+ runTest(testName, htd, bloomType, preCreateTable, tableSplitKeys, hfileRanges, depth);
}
private void runTest(String testName, HTableDescriptor htd, BloomType bloomType,
- boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges) throws Exception {
+ boolean preCreateTable, byte[][] tableSplitKeys, byte[][][] hfileRanges, int depth)
+ throws Exception {
for (boolean managed : new boolean[] { true, false }) {
- Path dir = util.getDataTestDirOnTestFS(testName);
+ Path baseDirectory = util.getDataTestDirOnTestFS(testName);
FileSystem fs = util.getTestFileSystem();
- dir = dir.makeQualified(fs);
- Path familyDir = new Path(dir, Bytes.toString(FAMILY));
+ baseDirectory = baseDirectory.makeQualified(fs);
+ Path parentDir = baseDirectory;
+ if (depth == 3) {
+ parentDir = new Path(baseDirectory, "someRegion");
+ }
+ Path familyDir = new Path(parentDir, Bytes.toString(FAMILY));
int hfileIdx = 0;
for (byte[][] range : hfileRanges) {
@@ -298,16 +310,16 @@ public class TestLoadIncrementalHFiles {
util.getHBaseAdmin().createTable(htd);
}
LoadIncrementalHFiles loader = new LoadIncrementalHFiles(util.getConfiguration());
-
+ loader.setDepth(depth);
if (managed) {
try (HTable table = new HTable(util.getConfiguration(), tableName)) {
- loader.doBulkLoad(dir, table);
+ loader.doBulkLoad(baseDirectory, table);
assertEquals(expectedRows, util.countRows(table));
}
} else {
try (Connection conn = ConnectionFactory.createConnection(util.getConfiguration());
HTable table = (HTable) conn.getTable(tableName)) {
- loader.doBulkLoad(dir, table);
+ loader.doBulkLoad(baseDirectory, table);
}
}
@@ -390,7 +402,7 @@ public class TestLoadIncrementalHFiles {
htd.addFamily(family);
try {
- runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges);
+ runTest(testName, htd, BloomType.NONE, true, SPLIT_KEYS, hFileRanges, 2);
assertTrue("Loading into table with non-existent family should have failed", false);
} catch (Exception e) {
assertTrue("IOException expected", e instanceof IOException);