You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ie...@apache.org on 2017/05/19 08:51:12 UTC
[1/2] beam git commit: [BEAM-2391] Clone Scan in HBaseReader
Repository: beam
Updated Branches:
refs/heads/master f458065da -> 49245080a
[BEAM-2391] Clone Scan in HBaseReader
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1da8da79
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1da8da79
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1da8da79
Branch: refs/heads/master
Commit: 1da8da79dcc623c53b35d97419566b736447f6b6
Parents: f458065
Author: Dan Halperin <dh...@google.com>
Authored: Thu May 18 16:19:29 2017 -0400
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Fri May 19 10:49:55 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 5 +++--
.../org/apache/beam/sdk/io/hbase/HBaseIOTest.java | 17 +++++++++++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/1da8da79/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
index 3c42da9..849873c 100644
--- a/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
+++ b/sdks/java/io/hbase/src/main/java/org/apache/beam/sdk/io/hbase/HBaseIO.java
@@ -487,8 +487,9 @@ public class HBaseIO {
connection = ConnectionFactory.createConnection(configuration);
TableName tableName = TableName.valueOf(tableId);
Table table = connection.getTable(tableName);
- Scan scan = source.read.serializableScan.get();
- scanner = table.getScanner(scan);
+ // [BEAM-2319] We have to clone the Scan because the underlying scanner may mutate it.
+ Scan scanClone = new Scan(source.read.serializableScan.get());
+ scanner = table.getScanner(scanClone);
iter = scanner.iterator();
return advance();
}
http://git-wip-us.apache.org/repos/asf/beam/blob/1da8da79/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
index 1cdfc7f..4a06789 100644
--- a/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
+++ b/sdks/java/io/hbase/src/test/java/org/apache/beam/sdk/io/hbase/HBaseIOTest.java
@@ -33,6 +33,7 @@ import org.apache.beam.sdk.io.hbase.HBaseIO.HBaseSource;
import org.apache.beam.sdk.io.range.ByteKey;
import org.apache.beam.sdk.io.range.ByteKeyRange;
import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.SourceTestUtils;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Create;
@@ -205,6 +206,22 @@ public class HBaseIOTest {
assertSourcesEqualReferenceSource(source, splits, null /* options */);
}
+ /** Tests that a {@link HBaseSource} can be read twice, verifying its immutability. */
+ @Test
+ public void testReadingSourceTwice() throws Exception {
+ final String table = "TEST-READING-TWICE";
+ final int numRows = 10;
+
+ // Set up test table data and sample row keys for size estimation and splitting.
+ createTable(table);
+ writeData(table, numRows);
+
+ HBaseIO.Read read = HBaseIO.read().withConfiguration(conf).withTableId(table);
+ HBaseSource source = new HBaseSource(read, null /* estimatedSizeBytes */);
+ assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows));
+ // second read.
+ assertThat(SourceTestUtils.readFromSource(source, null), hasSize(numRows));
+ }
/** Tests reading all rows using a filter. */
@Test
[2/2] beam git commit: This closes #3178
Posted by ie...@apache.org.
This closes #3178
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/49245080
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/49245080
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/49245080
Branch: refs/heads/master
Commit: 49245080ad6393b5678077caa047f7a6ef0efffc
Parents: f458065 1da8da7
Author: Ismaël Mejía <ie...@apache.org>
Authored: Fri May 19 10:50:06 2017 +0200
Committer: Ismaël Mejía <ie...@apache.org>
Committed: Fri May 19 10:50:06 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/beam/sdk/io/hbase/HBaseIO.java | 5 +++--
.../org/apache/beam/sdk/io/hbase/HBaseIOTest.java | 17 +++++++++++++++++
2 files changed, 20 insertions(+), 2 deletions(-)
----------------------------------------------------------------------