You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/05/25 02:21:02 UTC

[hbase] 01/02: HBASE-24387 TableSnapshotInputFormatImpl support row limit on each InputSplit (#1731)

This is an automated email from the ASF dual-hosted git repository.

zghao pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 61eb7e50034258494e07cd0eaf4d1be70e818d57
Author: niuyulin <ny...@163.com>
AuthorDate: Sun May 24 20:55:48 2020 -0500

    HBASE-24387 TableSnapshotInputFormatImpl support row limit on each InputSplit (#1731)
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../mapreduce/TableSnapshotInputFormatImpl.java    | 23 +++++++---
 .../mapreduce/TestTableSnapshotInputFormat.java    | 51 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 5 deletions(-)

diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index 9758f15..28b832e 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -102,6 +102,12 @@ public class TableSnapshotInputFormatImpl {
   public static final boolean SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT = true;
 
   /**
+   * In some scenario, scan limited rows on each InputSplit for sampling data extraction
+   */
+  public static final String SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT =
+      "hbase.TableSnapshotInputFormat.row.limit.per.inputsplit";
+
+  /**
    * Implementation class for InputSplit logic common between mapred and mapreduce.
    */
   public static class InputSplit implements Writable {
@@ -213,6 +219,8 @@ public class TableSnapshotInputFormatImpl {
     private Result result = null;
     private ImmutableBytesWritable row = null;
     private ClientSideRegionScanner scanner;
+    private int numOfCompleteRows = 0;
+    private int rowLimitPerSplit;
 
     public ClientSideRegionScanner getScanner() {
       return scanner;
@@ -221,6 +229,7 @@ public class TableSnapshotInputFormatImpl {
     public void initialize(InputSplit split, Configuration conf) throws IOException {
       this.scan = TableMapReduceUtil.convertStringToScan(split.getScan());
       this.split = split;
+      this.rowLimitPerSplit = conf.getInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 0);
       TableDescriptor htd = split.htd;
       HRegionInfo hri = this.split.getRegionInfo();
       FileSystem fs = CommonFSUtils.getCurrentFileSystem(conf);
@@ -244,6 +253,9 @@ public class TableSnapshotInputFormatImpl {
         return false;
       }
 
+      if (rowLimitPerSplit > 0 && ++this.numOfCompleteRows > rowLimitPerSplit) {
+        return false;
+      }
       if (this.row == null) {
         this.row = new ImmutableBytesWritable();
       }
@@ -296,10 +308,11 @@ public class TableSnapshotInputFormatImpl {
     return getSplits(scan, manifest, regionInfos, restoreDir, conf, splitAlgo, numSplits);
   }
 
-  public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException{
+  public static RegionSplitter.SplitAlgorithm getSplitAlgo(Configuration conf) throws IOException {
     String splitAlgoClassName = conf.get(SPLIT_ALGO);
-    if (splitAlgoClassName == null)
+    if (splitAlgoClassName == null) {
       return null;
+    }
     try {
       return Class.forName(splitAlgoClassName).asSubclass(RegionSplitter.SplitAlgorithm.class)
           .getDeclaredConstructor().newInstance();
@@ -511,9 +524,9 @@ public class TableSnapshotInputFormatImpl {
    * Configures the job to use TableSnapshotInputFormat to read from a snapshot.
    * @param conf the job to configure
    * @param snapshotName the name of the snapshot to read from
-   * @param restoreDir a temporary directory to restore the snapshot into. Current user should
-   * have write permissions to this directory, and this should not be a subdirectory of rootdir.
-   * After the job is finished, restoreDir can be deleted.
+   * @param restoreDir a temporary directory to restore the snapshot into. Current user should have
+   *          write permissions to this directory, and this should not be a subdirectory of rootdir.
+   *          After the job is finished, restoreDir can be deleted.
    * @param numSplitsPerRegion how many input splits to generate per one region
    * @param splitAlgo SplitAlgorithm to be used when generating InputSplits
    * @throws IOException if an error occurs
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index 5f187c6..d98340f 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_DEFAULT;
 import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_ENABLED_KEY;
+import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -305,6 +306,56 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
   }
 
   @Test
+  public void testScanLimit() throws Exception {
+    setupCluster();
+    final TableName tableName = TableName.valueOf(name.getMethodName());
+    final String snapshotName = tableName + "Snapshot";
+    Table table = null;
+    try {
+      UTIL.getConfiguration().setInt(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT, 10);
+      if (UTIL.getAdmin().tableExists(tableName)) {
+        UTIL.deleteTable(tableName);
+      }
+
+      UTIL.createTable(tableName, FAMILIES, new byte[][] { bbb, yyy });
+
+      Admin admin = UTIL.getAdmin();
+
+      int regionNum = admin.getRegions(tableName).size();
+      // put some stuff in the table
+      table = UTIL.getConnection().getTable(tableName);
+      UTIL.loadTable(table, FAMILIES);
+
+      Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
+      FileSystem fs = rootDir.getFileSystem(UTIL.getConfiguration());
+
+      SnapshotTestingUtils.createSnapshotAndValidate(admin, tableName, Arrays.asList(FAMILIES),
+        null, snapshotName, rootDir, fs, true);
+
+      Job job = new Job(UTIL.getConfiguration());
+      Path tmpTableDir = UTIL.getDataTestDirOnTestFS(snapshotName);
+      Scan scan = new Scan();
+      TableMapReduceUtil.addDependencyJarsForClasses(job.getConfiguration(),
+        TestTableSnapshotInputFormat.class);
+
+      TableMapReduceUtil.initTableSnapshotMapperJob(snapshotName, scan,
+        RowCounter.RowCounterMapper.class, NullWritable.class, NullWritable.class, job, true,
+        tmpTableDir);
+      Assert.assertTrue(job.waitForCompletion(true));
+      Assert.assertEquals(10 * regionNum,
+        job.getCounters().findCounter(RowCounter.RowCounterMapper.Counters.ROWS).getValue());
+    } finally {
+      if (table != null) {
+        table.close();
+      }
+      UTIL.getConfiguration().unset(SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT);
+      UTIL.getAdmin().deleteSnapshot(snapshotName);
+      UTIL.deleteTable(tableName);
+      tearDownCluster();
+    }
+  }
+
+  @Test
   public void testNoDuplicateResultsWhenSplitting() throws Exception {
     setupCluster();
     TableName tableName = TableName.valueOf("testNoDuplicateResultsWhenSplitting");