You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/19 22:05:22 UTC
[1/2] incubator-beam git commit: [BEAM-639] BigtableIO: add support
for users to scan table subranges
Repository: incubator-beam
Updated Branches:
refs/heads/master 984b32ff2 -> 62c56c99b
[BEAM-639] BigtableIO: add support for users to scan table subranges
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/dace48c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/dace48c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/dace48c7
Branch: refs/heads/master
Commit: dace48c70d2d00500514c53734e9dd45dcb1465f
Parents: 984b32f
Author: Dan Halperin <dh...@google.com>
Authored: Mon Sep 19 11:54:07 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 19 15:05:17 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 54 ++++++++---
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 95 ++++++++++++++++----
2 files changed, 122 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dace48c7/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
index 67dde50..c1b882a 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIO.java
@@ -76,8 +76,9 @@ import org.slf4j.LoggerFactory;
*
* <p>To configure a Cloud Bigtable source, you must supply a table id and a {@link BigtableOptions}
* or builder configured with the project and other information necessary to identify the
- * Bigtable instance. A {@link RowFilter} may also optionally be specified using
- * {@link BigtableIO.Read#withRowFilter}. For example:
+ * Bigtable instance. By default, {@link BigtableIO.Read} will read all rows in the table. The row
+ * range to be read can optionally be restricted using {@link BigtableIO.Read#withKeyRange}, and
+ * a {@link RowFilter} can be specified using {@link BigtableIO.Read#withRowFilter}. For example:
*
* <pre>{@code
* BigtableOptions.Builder optionsBuilder =
@@ -93,6 +94,14 @@ import org.slf4j.LoggerFactory;
* .withBigtableOptions(optionsBuilder)
* .withTableId("table"));
*
+ * // Scan a prefix of the table.
+ * ByteKeyRange keyRange = ...;
+ * p.apply("read",
+ * BigtableIO.read()
+ * .withBigtableOptions(optionsBuilder)
+ * .withTableId("table")
+ * .withKeyRange(keyRange));
+ *
* // Scan a subset of rows that match the specified row filter.
* p.apply("filtered read",
* BigtableIO.read()
@@ -152,7 +161,7 @@ public class BigtableIO {
*/
@Experimental
public static Read read() {
- return new Read(null, "", null, null);
+ return new Read(null, "", ByteKeyRange.ALL_KEYS, null, null);
}
/**
@@ -215,7 +224,7 @@ public class BigtableIO {
.build());
BigtableOptions optionsWithAgent = clonedBuilder.setUserAgent(getUserAgent()).build();
- return new Read(optionsWithAgent, tableId, filter, bigtableService);
+ return new Read(optionsWithAgent, tableId, keyRange, filter, bigtableService);
}
/**
@@ -226,7 +235,17 @@ public class BigtableIO {
*/
public Read withRowFilter(RowFilter filter) {
checkNotNull(filter, "filter");
- return new Read(options, tableId, filter, bigtableService);
+ return new Read(options, tableId, keyRange, filter, bigtableService);
+ }
+
+ /**
+ * Returns a new {@link BigtableIO.Read} that will read only rows in the specified range.
+ *
+ * <p>Does not modify this object.
+ */
+ public Read withKeyRange(ByteKeyRange keyRange) {
+ checkNotNull(keyRange, "keyRange");
+ return new Read(options, tableId, keyRange, filter, bigtableService);
}
/**
@@ -236,7 +255,7 @@ public class BigtableIO {
*/
public Read withTableId(String tableId) {
checkNotNull(tableId, "tableId");
- return new Read(options, tableId, filter, bigtableService);
+ return new Read(options, tableId, keyRange, filter, bigtableService);
}
/**
@@ -247,6 +266,14 @@ public class BigtableIO {
}
/**
+ * Returns the range of keys that will be read from the table. By default, returns
+ * {@link ByteKeyRange#ALL_KEYS} to scan the entire table.
+ */
+ public ByteKeyRange getKeyRange() {
+ return keyRange;
+ }
+
+ /**
* Returns the table being read from.
*/
public String getTableId() {
@@ -256,7 +283,7 @@ public class BigtableIO {
@Override
public PCollection<Row> apply(PBegin input) {
BigtableSource source =
- new BigtableSource(getBigtableService(), tableId, filter, ByteKeyRange.ALL_KEYS, null);
+ new BigtableSource(getBigtableService(), tableId, filter, keyRange, null);
return input.getPipeline().apply(org.apache.beam.sdk.io.Read.from(source));
}
@@ -284,6 +311,9 @@ public class BigtableIO {
.withLabel("Bigtable Options"));
}
+ builder.addIfNotDefault(
+ DisplayData.item("keyRange", keyRange.toString()), ByteKeyRange.ALL_KEYS.toString());
+
if (filter != null) {
builder.add(DisplayData.item("rowFilter", filter.toString())
.withLabel("Table Row Filter"));
@@ -295,6 +325,7 @@ public class BigtableIO {
return MoreObjects.toStringHelper(Read.class)
.add("options", options)
.add("tableId", tableId)
+ .add("keyRange", keyRange)
.add("filter", filter)
.toString();
}
@@ -307,16 +338,19 @@ public class BigtableIO {
*/
@Nullable private final BigtableOptions options;
private final String tableId;
+ private final ByteKeyRange keyRange;
@Nullable private final RowFilter filter;
@Nullable private final BigtableService bigtableService;
private Read(
@Nullable BigtableOptions options,
String tableId,
+ ByteKeyRange keyRange,
@Nullable RowFilter filter,
@Nullable BigtableService bigtableService) {
this.options = options;
this.tableId = checkNotNull(tableId, "tableId");
+ this.keyRange = checkNotNull(keyRange, "keyRange");
this.filter = filter;
this.bigtableService = bigtableService;
}
@@ -331,7 +365,7 @@ public class BigtableIO {
*/
Read withBigtableService(BigtableService bigtableService) {
checkNotNull(bigtableService, "bigtableService");
- return new Read(options, tableId, filter, bigtableService);
+ return new Read(options, tableId, keyRange, filter, bigtableService);
}
/**
@@ -615,7 +649,7 @@ public class BigtableIO {
String tableId,
@Nullable RowFilter filter,
ByteKeyRange range,
- Long estimatedSizeBytes) {
+ @Nullable Long estimatedSizeBytes) {
this.service = service;
this.tableId = tableId;
this.filter = filter;
@@ -635,7 +669,7 @@ public class BigtableIO {
////// Private state and internal implementation details //////
private final BigtableService service;
- @Nullable private final String tableId;
+ private final String tableId;
@Nullable private final RowFilter filter;
private final ByteKeyRange range;
@Nullable private Long estimatedSizeBytes;
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dace48c7/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
index d60ede6..f21e6c0 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigtable/BigtableIOTest.java
@@ -28,7 +28,11 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasLabel;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasValue;
import static org.hamcrest.Matchers.allOf;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.hasItems;
import static org.hamcrest.Matchers.hasSize;
+import static org.hamcrest.Matchers.lessThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
@@ -49,6 +53,7 @@ import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.protobuf.ByteString;
@@ -218,12 +223,8 @@ public class BigtableIOTest {
final String table = "TEST-EMPTY-TABLE";
service.createTable(table);
- TestPipeline p = TestPipeline.create();
- PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
- PAssert.that(rows).empty();
-
- p.run();
- logged.verifyInfo(String.format("Closing reader after reading 0 records."));
+ runReadTest(defaultRead.withTableId(table), new ArrayList<Row>());
+ logged.verifyInfo("Closing reader after reading 0 records.");
}
/** Tests reading all rows from a table. */
@@ -233,11 +234,7 @@ public class BigtableIOTest {
final int numRows = 1001;
List<Row> testRows = makeTableData(table, numRows);
- TestPipeline p = TestPipeline.create();
- PCollection<Row> rows = p.apply(defaultRead.withTableId(table));
- PAssert.that(rows).containsInAnyOrder(testRows);
-
- p.run();
+ runReadTest(defaultRead.withTableId(table), testRows);
logged.verifyInfo(String.format("Closing reader after reading %d records.", numRows));
}
@@ -256,6 +253,68 @@ public class BigtableIOTest {
}
}
+ private static List<Row> filterToRange(List<Row> rows, final ByteKeyRange range) {
+ return Lists.newArrayList(Iterables.filter(
+ rows,
+ new Predicate<Row>() {
+ @Override
+ public boolean apply(@Nullable Row input) {
+ verifyNotNull(input, "input");
+ return range.containsKey(ByteKey.of(input.getKey()));
+ }
+ }));
+ }
+
+ private static void runReadTest(BigtableIO.Read read, List<Row> expected) {
+ TestPipeline p = TestPipeline.create();
+ PCollection<Row> rows = p.apply(read);
+ PAssert.that(rows).containsInAnyOrder(expected);
+ p.run();
+ }
+
+ /**
+ * Tests reading all rows using key ranges. Tests a prefix [), a suffix (], and a restricted
+ * range [] and that some properties hold across them.
+ */
+ @Test
+ public void testReadingWithKeyRange() throws Exception {
+ final String table = "TEST-KEY-RANGE-TABLE";
+ final int numRows = 1001;
+ List<Row> testRows = makeTableData(table, numRows);
+ ByteKey startKey = ByteKey.copyFrom("key000000100".getBytes());
+ ByteKey endKey = ByteKey.copyFrom("key000000300".getBytes());
+
+ // Test prefix: [beginning, startKey).
+ final ByteKeyRange prefixRange = ByteKeyRange.ALL_KEYS.withEndKey(startKey);
+ List<Row> prefixRows = filterToRange(testRows, prefixRange);
+ runReadTest(defaultRead.withTableId(table).withKeyRange(prefixRange), prefixRows);
+
+ // Test suffix: [startKey, end).
+ final ByteKeyRange suffixRange = ByteKeyRange.ALL_KEYS.withStartKey(startKey);
+ List<Row> suffixRows = filterToRange(testRows, suffixRange);
+ runReadTest(defaultRead.withTableId(table).withKeyRange(suffixRange), suffixRows);
+
+ // Test restricted range: [startKey, endKey).
+ final ByteKeyRange middleRange = ByteKeyRange.of(startKey, endKey);
+ List<Row> middleRows = filterToRange(testRows, middleRange);
+ runReadTest(defaultRead.withTableId(table).withKeyRange(middleRange), middleRows);
+
+ //////// Size and content sanity checks //////////
+
+ // Prefix, suffix, middle should be non-trivial (non-zero,non-all).
+ assertThat(prefixRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0))));
+ assertThat(suffixRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0))));
+ assertThat(middleRows, allOf(hasSize(lessThan(numRows)), hasSize(greaterThan(0))));
+
+ // Prefix + suffix should be exactly all rows.
+ List<Row> union = Lists.newArrayList(prefixRows);
+ union.addAll(suffixRows);
+ assertThat("prefix + suffix = total", union, containsInAnyOrder(testRows.toArray(new Row[]{})));
+
+ // Suffix should contain the middle.
+ assertThat(suffixRows, hasItems(middleRows.toArray(new Row[]{})));
+ }
+
/** Tests reading all rows using a filter. */
@Test
public void testReadingWithFilter() throws Exception {
@@ -278,11 +337,8 @@ public class BigtableIOTest {
RowFilter filter =
RowFilter.newBuilder().setRowKeyRegexFilter(ByteString.copyFromUtf8(regex)).build();
- TestPipeline p = TestPipeline.create();
- PCollection<Row> rows = p.apply(defaultRead.withTableId(table).withRowFilter(filter));
- PAssert.that(rows).containsInAnyOrder(filteredRows);
-
- p.run();
+ runReadTest(
+ defaultRead.withTableId(table).withRowFilter(filter), Lists.newArrayList(filteredRows));
}
/**
@@ -408,10 +464,12 @@ public class BigtableIOTest {
.setRowKeyRegexFilter(ByteString.copyFromUtf8("foo.*"))
.build();
+ ByteKeyRange keyRange = ByteKeyRange.ALL_KEYS.withEndKey(ByteKey.of(0xab, 0xcd));
BigtableIO.Read read = BigtableIO.read()
.withBigtableOptions(BIGTABLE_OPTIONS)
.withTableId("fooTable")
- .withRowFilter(rowFilter);
+ .withRowFilter(rowFilter)
+ .withKeyRange(keyRange);
DisplayData displayData = DisplayData.from(read);
@@ -419,8 +477,11 @@ public class BigtableIOTest {
hasKey("tableId"),
hasLabel("Table ID"),
hasValue("fooTable"))));
+
assertThat(displayData, hasDisplayItem("rowFilter", rowFilter.toString()));
+ assertThat(displayData, hasDisplayItem("keyRange", keyRange.toString()));
+
// BigtableIO adds user-agent to options; assert only on key and not value.
assertThat(displayData, hasDisplayItem("bigtableOptions"));
}
[2/2] incubator-beam git commit: Closes #969
Posted by dh...@apache.org.
Closes #969
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/62c56c99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/62c56c99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/62c56c99
Branch: refs/heads/master
Commit: 62c56c99b109e060aad6d210d81163a1fda36825
Parents: 984b32f dace48c
Author: Dan Halperin <dh...@google.com>
Authored: Mon Sep 19 15:05:18 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Mon Sep 19 15:05:18 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/bigtable/BigtableIO.java | 54 ++++++++---
.../sdk/io/gcp/bigtable/BigtableIOTest.java | 95 ++++++++++++++++----
2 files changed, 122 insertions(+), 27 deletions(-)
----------------------------------------------------------------------