You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2021/09/14 16:59:02 UTC
[hbase] branch branch-2.3 updated: HBASE-26273 Force
ReadType.STREAM when the user does not explicitly set a ReadType on the
Scan for a Snapshot-based Job
This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.3 by this push:
new 565d419 HBASE-26273 Force ReadType.STREAM when the user does not explicitly set a ReadType on the Scan for a Snapshot-based Job
565d419 is described below
commit 565d4197f531280bf00d5677c5fd27e93cebc53e
Author: Josh Elser <el...@apache.org>
AuthorDate: Fri Sep 10 16:24:13 2021 -0400
HBASE-26273 Force ReadType.STREAM when the user does not explicitly set a ReadType on the Scan for a Snapshot-based Job
HBase 2 moved over Scans to use PREAD by default instead of STREAM like
HBase 1. In the context of a MapReduce job, we can generally expect that
clients using the InputFormat (batch job) would be reading most of the
data for a job. Cater to them, but still give users who want PREAD the
ability to do so.
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Tak Lon (Stephen) Wu <ta...@apache.org>
---
.../mapreduce/TableSnapshotInputFormatImpl.java | 18 ++++++++++++
.../mapreduce/TestTableSnapshotInputFormat.java | 33 ++++++++++++++++++++++
2 files changed, 51 insertions(+)
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
index c3f05f4..f83a9b9 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatImpl.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.client.IsolationLevel;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -129,6 +130,14 @@ public class TableSnapshotInputFormatImpl {
public static final boolean SNAPSHOT_INPUTFORMAT_SCAN_METRICS_ENABLED_DEFAULT = true;
/**
+ * The {@link ReadType} which should be set on the {@link Scan} to read the HBase Snapshot,
+ * default STREAM.
+ */
+ public static final String SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE =
+ "hbase.TableSnapshotInputFormat.scanner.readtype";
+ public static final ReadType SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT = ReadType.STREAM;
+
+ /**
* Implementation class for InputSplit logic common between mapred and mapreduce.
*/
public static class InputSplit implements Writable {
@@ -382,6 +391,15 @@ public class TableSnapshotInputFormatImpl {
} else {
throw new IllegalArgumentException("Unable to create scan");
}
+
+ if (scan.getReadType() == ReadType.DEFAULT) {
+ LOG.info("Provided Scan has DEFAULT ReadType,"
+ + " updating STREAM for Snapshot-based InputFormat");
+ // Update the "DEFAULT" ReadType to be "STREAM" to try to improve the default case.
+ scan.setReadType(conf.getEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE,
+ SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE_DEFAULT));
+ }
+
return scan;
}
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
index 0820f3b..b1a07f0 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableSnapshotInputFormat.java
@@ -22,6 +22,8 @@ import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNA
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_ROW_LIMIT_PER_INPUTSPLIT;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION;
import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_LOCALITY_BY_REGION_LOCATION_DEFAULT;
+import static org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl.SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE;
+import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Scan.ReadType;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TestTableSnapshotScanner;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -407,6 +410,36 @@ public class TestTableSnapshotInputFormat extends TableSnapshotInputFormatTestBa
}
}
+ @Test
+ public void testScannerReadTypeConfiguration() throws IOException {
+ Configuration conf = new Configuration(false);
+ // Explicitly set ReadTypes should persist
+ for (ReadType readType : Arrays.asList(ReadType.PREAD, ReadType.STREAM)) {
+ Scan scanWithReadType = new Scan();
+ scanWithReadType.setReadType(readType);
+ assertEquals(scanWithReadType.getReadType(),
+ serializeAndReturn(conf, scanWithReadType).getReadType());
+ }
+ // We should only see the DEFAULT ReadType getting updated to STREAM.
+ Scan scanWithoutReadType = new Scan();
+ assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
+ assertEquals(ReadType.STREAM, serializeAndReturn(conf, scanWithoutReadType).getReadType());
+
+ // We should still be able to force a certain ReadType when DEFAULT is given.
+ conf.setEnum(SNAPSHOT_INPUTFORMAT_SCANNER_READTYPE, ReadType.PREAD);
+ assertEquals(ReadType.DEFAULT, scanWithoutReadType.getReadType());
+ assertEquals(ReadType.PREAD, serializeAndReturn(conf, scanWithoutReadType).getReadType());
+ }
+
+ /**
+ * Serializes and deserializes the given scan in the same manner that
+ * TableSnapshotInputFormat does.
+ */
+ private Scan serializeAndReturn(Configuration conf, Scan s) throws IOException {
+ conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(s));
+ return TableSnapshotInputFormatImpl.extractScanFromConf(conf);
+ }
+
private void verifyWithMockedMapReduce(Job job, int numRegions, int expectedNumSplits,
byte[] startRow, byte[] stopRow)
throws IOException, InterruptedException {