You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/05/25 02:21:02 UTC
[hbase] 01/02: HBASE-24387 TableSnapshotInputFormatImpl support row
limit on each InputSplit (#1731)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 61eb7e50034258494e07cd0eaf4d1be70e818d57
Author: niuyulin <ny...@163.com>
AuthorDate: Sun May 24 20:55:48 2020 -0500
HBASE-24387 TableSnapshotInputFormatImpl support row limit on each InputSplit (#1731)
Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
.../mapreduce/TableSnapshotInputFormatImpl.java | 23 +++++++---
.../mapreduce/TestTableSnapshotInputFormat.java | 51 ++++++++++++++++++++++
2 files changed, 69 insertions(+), 5 deletions(-)
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index 9758f15..28b832e 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -102,6 +102,12 @@ public class TableSnapshotInputFormatImpl {
public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
/**
+ * In some scenario, scan limited rows on each InputSplit for sampling data extraction
+ */
+ public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT =
+ "hbase.TableSnapshotInputFormat.row.limit.per.inputsplit";
+
+ /**
* Implementation class for InputSplit logic common between mapred and mapreduce.
*/
public static class InputSplit implements Writable {
@@ -213,6 +219,8 @@ public class TableSnapshotInputFormatImpl {
private Result result = null;
private ImmutableBytesWritable row = null;
private ClientSideRegionScanner scanner;
+ private int numOfCompleteRows = 0;
+ private int rowLimitPerSplit;
public ClientSideRegionScanner getScanner() {
return scanner;
@@ -221,6 +229,7 @@ public class TableSnapshotInputFormatImpl {
public void initialize(InputSplit split, Configuration conf) throws IOException {
this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
this.split = split;
+ this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
TableDescriptor htd = split.htd;
HRegionInfo hri = this.split.getRegionInfo();
FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
@@ -244,6 +253,9 @@ public class TableSnapshotInputFormatImpl {
return false;
}
+ if (rowLimitPerSplit > 0 && ++this.numOfCompleteRows > rowLimitPerSplit) {
+ return false;
+ }
if (this.row == null) {
this.row = new ImmutableBytesWritable();
}
@@ -296,10 +308,11 @@ public class TableSnapshotInputFormatImpl {
return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
}
- public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{
+ public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException {
String splitAlgoClassName = conf.get(SPLIT_ALGO);
- if (splitAlgoClassName == null)
+ if (splitAlgoClassName == null) {
return null;
+ }
try {
return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class)
.getDeclaredConstructor().newInstance();
@@ -511,9 +524,9 @@ public class TableSnapshotInputFormatImpl {
* Configures the job to use TableSnapshotInputFormat to read from a snapshot.
* @param conf the job to configure
* @param snapshotName the name of the snapshot to read from
- * @param restoreDir a temporary directory to restore the snapshot into. Current user should
- * have write permissions to this directory, and this should not be a subdirectory of rootdir.
- * After the job is finished, restoreDir can be deleted.
+ * @param restoreDir a temporary directory to restore the snapshot into. Current user should have
+ * write permissions to this directory, and this should not be a subdirectory of rootdir.
+ * After the job is finished, restoreDir can be deleted.
* @param numSplitsPerRegion how many input splits to generate per one region
* @param splitAlgo SplitAlgorithm to be used when generating InputSplits
* @throws IOException if an error occurs
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index 5f187c6..d98340f 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
+import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -305,6 +306,56 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
}
@Test
+ public void testScanLimit() throws Exception {
+ setupCluster();
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ final String snapshotName = tableName + "Snapshot";
+ Table table = null;
+ try {
+ UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
+ if (UTIL.getAdmin().tableExists(tableName)) {
+ UTIL.deleteTable(tableName);
+ }
+
+ UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
+
+ Admin admin = UTIL.getAdmin();
+
+ int regionNum = admin.getRegions(tableName).size();
+ // put some stuff in the table
+ table = UTIL.getConnection().getTable(tableName);
+ UTIL.loadTable(table, FAMILIES);
+
+ Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
+ FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
+
+ SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
+ null, snapshotName, rootDir, fs, true);
+
+ Job job = new Job(UTIL.getConfiguration());
+ Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+ Scan scan = new Scan();
+ TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+ TestTableSnapshotInputFormat.class);
+
+ TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
+ RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
+ tmpTableDir);
+ Assert.assertTrue(job.waitForCompletion(true));
+ Assert.assertEquals(10 * regionNum,
+ job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
+ } finally {
+ if (table != null) {
+ table.close();
+ }
+ UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT);
+ UTIL.getAdmin().deleteSnapshot(snapshotName);
+ UTIL.deleteTable(tableName);
+ tearDownCluster();
+ }
+ }
+
+ @Test
public void testNoDuplicateResultsWhenSplitting() throws Exception {
setupCluster();
TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");