You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/07/20 17:59:21 UTC
[1/2] beam git commit: Introduces SpannerIO.readAll()
Repository: beam
Updated Branches:
refs/heads/master c8e3744ad -> afeba3715
Introduces SpannerIO.readAll()
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95e9c28c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95e9c28c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95e9c28c
Branch: refs/heads/master
Commit: 95e9c28ca4da5bac31f3d768595693e43b464c1c
Parents: c8e3744
Author: Mairbek Khadikov <ma...@google.com>
Authored: Tue Jul 18 16:23:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 10:58:51 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 35 ++--
.../beam/sdk/io/gcp/spanner/ReadOperation.java | 96 ++++++++++
.../beam/sdk/io/gcp/spanner/SpannerIO.java | 187 ++++++++++++++-----
.../sdk/io/gcp/spanner/SpannerIOReadTest.java | 145 +++++++++-----
4 files changed, 353 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
index d193b95..92b3fe3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
@@ -22,44 +22,53 @@ import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.PCollectionView;
/** A simplest read function implementation. Parallelism support is coming. */
@VisibleForTesting
-class NaiveSpannerReadFn extends AbstractSpannerFn<Object, Struct> {
- private final SpannerIO.Read config;
+class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> {
+ private final SpannerConfig config;
+ @Nullable private final PCollectionView<Transaction> transaction;
- NaiveSpannerReadFn(SpannerIO.Read config) {
+ NaiveSpannerReadFn(SpannerConfig config, @Nullable PCollectionView<Transaction> transaction) {
this.config = config;
+ this.transaction = transaction;
+ }
+
+ NaiveSpannerReadFn(SpannerConfig config) {
+ this(config, null);
}
SpannerConfig getSpannerConfig() {
- return config.getSpannerConfig();
+ return config;
}
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
TimestampBound timestampBound = TimestampBound.strong();
- if (config.getTransaction() != null) {
- Transaction transaction = c.sideInput(config.getTransaction());
+ if (transaction != null) {
+ Transaction transaction = c.sideInput(this.transaction);
timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp());
}
+ ReadOperation op = c.element();
try (ReadOnlyTransaction readOnlyTransaction =
databaseClient().readOnlyTransaction(timestampBound)) {
- ResultSet resultSet = execute(readOnlyTransaction);
+ ResultSet resultSet = execute(op, readOnlyTransaction);
while (resultSet.next()) {
c.output(resultSet.getCurrentRowAsStruct());
}
}
}
- private ResultSet execute(ReadOnlyTransaction readOnlyTransaction) {
- if (config.getQuery() != null) {
- return readOnlyTransaction.executeQuery(config.getQuery());
+ private ResultSet execute(ReadOperation op, ReadOnlyTransaction readOnlyTransaction) {
+ if (op.getQuery() != null) {
+ return readOnlyTransaction.executeQuery(op.getQuery());
}
- if (config.getIndex() != null) {
+ if (op.getIndex() != null) {
return readOnlyTransaction.readUsingIndex(
- config.getTable(), config.getIndex(), config.getKeySet(), config.getColumns());
+ op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns());
}
- return readOnlyTransaction.read(config.getTable(), config.getKeySet(), config.getColumns());
+ return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns());
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
new file mode 100644
index 0000000..3b2bb6b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Statement;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/** Encapsulates a spanner read operation. */
+@AutoValue
+public abstract class ReadOperation implements Serializable {
+
+ public static ReadOperation create() {
+ return new AutoValue_ReadOperation.Builder().setKeySet(KeySet.all()).build();
+ }
+
+ @Nullable
+ public abstract Statement getQuery();
+
+ @Nullable
+ public abstract String getTable();
+
+ @Nullable
+ public abstract String getIndex();
+
+ @Nullable
+ public abstract List<String> getColumns();
+
+ @Nullable
+ public abstract KeySet getKeySet();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+
+ abstract Builder setQuery(Statement statement);
+
+ abstract Builder setTable(String table);
+
+ abstract Builder setIndex(String index);
+
+ abstract Builder setColumns(List<String> columns);
+
+ abstract Builder setKeySet(KeySet keySet);
+
+ abstract ReadOperation build();
+ }
+
+ abstract Builder toBuilder();
+
+ public ReadOperation withTable(String table) {
+ return toBuilder().setTable(table).build();
+ }
+
+ public ReadOperation withColumns(String... columns) {
+ return withColumns(Arrays.asList(columns));
+ }
+
+ public ReadOperation withColumns(List<String> columns) {
+ return toBuilder().setColumns(columns).build();
+ }
+
+ public ReadOperation withQuery(Statement statement) {
+ return toBuilder().setQuery(statement).build();
+ }
+
+ public ReadOperation withQuery(String sql) {
+ return withQuery(Statement.of(sql));
+ }
+
+ public ReadOperation withKeySet(KeySet keySet) {
+ return toBuilder().setKeySet(keySet).build();
+ }
+
+ public ReadOperation withIndex(String index) {
+ return toBuilder().setIndex(index).build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index a247d4c..e5c9c05 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -31,12 +31,11 @@ import com.google.cloud.spanner.Statement;
import com.google.cloud.spanner.Struct;
import com.google.cloud.spanner.TimestampBound;
import com.google.common.annotations.VisibleForTesting;
-
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import javax.annotation.Nullable;
-
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
@@ -44,7 +43,11 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
@@ -172,7 +175,18 @@ public class SpannerIO {
return new AutoValue_SpannerIO_Read.Builder()
.setSpannerConfig(SpannerConfig.create())
.setTimestampBound(TimestampBound.strong())
- .setKeySet(KeySet.all())
+ .setReadOperation(ReadOperation.create())
+ .build();
+ }
+
+ /**
+ * A {@link PTransform} that works like {@link #read}, but executes read operations coming from a
+ * {@link PCollection}.
+ */
+ @Experimental(Experimental.Kind.SOURCE_SINK)
+ public static ReadAll readAll() {
+ return new AutoValue_SpannerIO_ReadAll.Builder()
+ .setSpannerConfig(SpannerConfig.create())
.build();
}
@@ -202,34 +216,113 @@ public class SpannerIO {
.build();
}
- /**
- * A {@link PTransform} that reads data from Google Cloud Spanner.
- *
- * @see SpannerIO
- */
+ /** Implementation of {@link #readAll}. */
@Experimental(Experimental.Kind.SOURCE_SINK)
@AutoValue
- public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> {
+ public abstract static class ReadAll
+ extends PTransform<PCollection<ReadOperation>, PCollection<Struct>> {
abstract SpannerConfig getSpannerConfig();
@Nullable
- abstract TimestampBound getTimestampBound();
+ abstract PCollectionView<Transaction> getTransaction();
- @Nullable
- abstract Statement getQuery();
+ abstract Builder toBuilder();
- @Nullable
- abstract String getTable();
+ @AutoValue.Builder
+ abstract static class Builder {
+ abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
- @Nullable
- abstract String getIndex();
+ abstract Builder setTransaction(PCollectionView<Transaction> transaction);
- @Nullable
- abstract List<String> getColumns();
+ abstract ReadAll build();
+ }
+
+ /** Specifies the Cloud Spanner configuration. */
+ public ReadAll withSpannerConfig(SpannerConfig spannerConfig) {
+ return toBuilder().setSpannerConfig(spannerConfig).build();
+ }
+
+ /** Specifies the Cloud Spanner project. */
+ public ReadAll withProjectId(String projectId) {
+ return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+ }
+
+ /** Specifies the Cloud Spanner project. */
+ public ReadAll withProjectId(ValueProvider<String> projectId) {
+ SpannerConfig config = getSpannerConfig();
+ return withSpannerConfig(config.withProjectId(projectId));
+ }
+
+ /** Specifies the Cloud Spanner instance. */
+ public ReadAll withInstanceId(String instanceId) {
+ return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+ }
+
+ /** Specifies the Cloud Spanner instance. */
+ public ReadAll withInstanceId(ValueProvider<String> instanceId) {
+ SpannerConfig config = getSpannerConfig();
+ return withSpannerConfig(config.withInstanceId(instanceId));
+ }
+
+ /** Specifies the Cloud Spanner database. */
+ public ReadAll withDatabaseId(String databaseId) {
+ return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
+ }
+
+ /** Specifies the Cloud Spanner database. */
+ public ReadAll withDatabaseId(ValueProvider<String> databaseId) {
+ SpannerConfig config = getSpannerConfig();
+ return withSpannerConfig(config.withDatabaseId(databaseId));
+ }
+
+ @VisibleForTesting
+ ReadAll withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
+ SpannerConfig config = getSpannerConfig();
+ return withSpannerConfig(config.withServiceFactory(serviceFactory));
+ }
+
+ public ReadAll withTransaction(PCollectionView<Transaction> transaction) {
+ return toBuilder().setTransaction(transaction).build();
+ }
+
+ @Override
+ public PCollection<Struct> expand(PCollection<ReadOperation> input) {
+ PCollection<ReadOperation> reshuffled =
+ input
+ .apply(
+ "Pair with random key",
+ WithKeys.of(
+ new SerializableFunction<ReadOperation, String>() {
+ @Override
+ public String apply(ReadOperation ignored) {
+ return UUID.randomUUID().toString();
+ }
+ }))
+ .apply("Reshuffle", Reshuffle.<String, ReadOperation>of())
+ .apply("Strip keys", Values.<ReadOperation>create());
+ List<PCollectionView<Transaction>> sideInputs =
+ getTransaction() == null
+ ? Collections.<PCollectionView<Transaction>>emptyList()
+ : Collections.singletonList(getTransaction());
+ return reshuffled.apply(
+ "Execute queries",
+ ParDo.of(new NaiveSpannerReadFn(getSpannerConfig(), getTransaction()))
+ .withSideInputs(sideInputs));
+ }
+ }
+
+ /** Implementation of {@link #read}. */
+ @Experimental(Experimental.Kind.SOURCE_SINK)
+ @AutoValue
+ public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> {
+
+ abstract SpannerConfig getSpannerConfig();
+
+ abstract ReadOperation getReadOperation();
@Nullable
- abstract KeySet getKeySet();
+ abstract TimestampBound getTimestampBound();
@Nullable
abstract PCollectionView<Transaction> getTransaction();
@@ -241,17 +334,9 @@ public class SpannerIO {
abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
- abstract Builder setTimestampBound(TimestampBound timestampBound);
-
- abstract Builder setQuery(Statement statement);
-
- abstract Builder setTable(String table);
-
- abstract Builder setIndex(String index);
+ abstract Builder setReadOperation(ReadOperation readOperation);
- abstract Builder setColumns(List<String> columns);
-
- abstract Builder setKeySet(KeySet keySet);
+ abstract Builder setTimestampBound(TimestampBound timestampBound);
abstract Builder setTransaction(PCollectionView<Transaction> transaction);
@@ -315,7 +400,11 @@ public class SpannerIO {
}
public Read withTable(String table) {
- return toBuilder().setTable(table).build();
+ return withReadOperation(getReadOperation().withTable(table));
+ }
+
+ public Read withReadOperation(ReadOperation operation) {
+ return toBuilder().setReadOperation(operation).build();
}
public Read withColumns(String... columns) {
@@ -323,11 +412,11 @@ public class SpannerIO {
}
public Read withColumns(List<String> columns) {
- return toBuilder().setColumns(columns).build();
+ return withReadOperation(getReadOperation().withColumns(columns));
}
public Read withQuery(Statement statement) {
- return toBuilder().setQuery(statement).build();
+ return withReadOperation(getReadOperation().withQuery(statement));
}
public Read withQuery(String sql) {
@@ -335,14 +424,13 @@ public class SpannerIO {
}
public Read withKeySet(KeySet keySet) {
- return toBuilder().setKeySet(keySet).build();
+ return withReadOperation(getReadOperation().withKeySet(keySet));
}
public Read withIndex(String index) {
- return toBuilder().setIndex(index).build();
+ return withReadOperation(getReadOperation().withIndex(index));
}
-
@Override
public void validate(PipelineOptions options) {
getSpannerConfig().validate(options);
@@ -351,16 +439,16 @@ public class SpannerIO {
"SpannerIO.read() runs in a read only transaction and requires timestamp to be set "
+ "with withTimestampBound or withTimestamp method");
- if (getQuery() != null) {
+ if (getReadOperation().getQuery() != null) {
// TODO: validate query?
- } else if (getTable() != null) {
+ } else if (getReadOperation().getTable() != null) {
// Assume read
checkNotNull(
- getColumns(),
+ getReadOperation().getColumns(),
"For a read operation SpannerIO.read() requires a list of "
+ "columns to set with withColumns method");
checkArgument(
- !getColumns().isEmpty(),
+ !getReadOperation().getColumns().isEmpty(),
"For a read operation SpannerIO.read() requires a"
+ " list of columns to set with withColumns method");
} else {
@@ -371,18 +459,17 @@ public class SpannerIO {
@Override
public PCollection<Struct> expand(PBegin input) {
- Read config = this;
- List<PCollectionView<Transaction>> sideInputs = Collections.emptyList();
- if (getTimestampBound() != null) {
- PCollectionView<Transaction> transaction =
- input.apply(createTransaction().withSpannerConfig(getSpannerConfig()));
- config = config.withTransaction(transaction);
- sideInputs = Collections.singletonList(transaction);
+ PCollectionView<Transaction> transaction = getTransaction();
+ if (transaction == null && getTimestampBound() != null) {
+ transaction =
+ input.apply(
+ createTransaction()
+ .withTimestampBound(getTimestampBound())
+ .withSpannerConfig(getSpannerConfig()));
}
- return input
- .apply(Create.of(1))
- .apply(
- "Execute query", ParDo.of(new NaiveSpannerReadFn(config)).withSideInputs(sideInputs));
+ ReadAll readAll =
+ readAll().withSpannerConfig(getSpannerConfig()).withTransaction(transaction);
+ return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll);
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
index 5ba2da0..6eb1a33 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
@@ -39,6 +39,7 @@ import java.util.List;
import org.apache.beam.sdk.testing.NeedsRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
@@ -55,6 +56,7 @@ import org.mockito.Mockito;
/** Unit tests for {@link SpannerIO}. */
@RunWith(JUnit4.class)
public class SpannerIOReadTest implements Serializable {
+
@Rule
public final transient TestPipeline pipeline = TestPipeline.create();
@Rule
@@ -63,12 +65,16 @@ public class SpannerIOReadTest implements Serializable {
private FakeServiceFactory serviceFactory;
private ReadOnlyTransaction mockTx;
- private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()),
- Type.StructField.of("name", Type.string()));
+ private static final Type FAKE_TYPE =
+ Type.struct(
+ Type.StructField.of("id", Type.int64()), Type.StructField.of("name", Type.string()));
- private List<Struct> fakeRows = Arrays.asList(
- Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(),
- Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build());
+ private static final List<Struct> FAKE_ROWS =
+ Arrays.asList(
+ Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(),
+ Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build(),
+ Struct.newBuilder().add("id", Value.int64(3)).add("name", Value.string("Carl")).build(),
+ Struct.newBuilder().add("id", Value.int64(4)).add("name", Value.string("Dan")).build());
@Before
@SuppressWarnings("unchecked")
@@ -153,20 +159,19 @@ public class SpannerIOReadTest implements Serializable {
.withProjectId("test")
.withInstanceId("123")
.withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
.withQuery("SELECT * FROM users")
.withServiceFactory(serviceFactory);
- NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
- DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
+ NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
+ DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn);
when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
.thenReturn(mockTx);
when(mockTx.executeQuery(any(Statement.class)))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+ .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
- List<Struct> result = fnTester.processBundle(1);
- assertThat(result, Matchers.<Struct>iterableWithSize(2));
+ List<Struct> result = fnTester.processBundle(read.getReadOperation());
+ assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray()));
verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound
.strong());
@@ -180,21 +185,20 @@ public class SpannerIOReadTest implements Serializable {
.withProjectId("test")
.withInstanceId("123")
.withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
.withTable("users")
.withColumns("id", "name")
.withServiceFactory(serviceFactory);
- NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
- DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
+ NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
+ DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn);
when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
.thenReturn(mockTx);
when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+ .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
- List<Struct> result = fnTester.processBundle(1);
- assertThat(result, Matchers.<Struct>iterableWithSize(2));
+ List<Struct> result = fnTester.processBundle(read.getReadOperation());
+ assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray()));
verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
verify(mockTx).read("users", KeySet.all(), Arrays.asList("id", "name"));
@@ -213,16 +217,16 @@ public class SpannerIOReadTest implements Serializable {
.withIndex("theindex")
.withServiceFactory(serviceFactory);
- NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
- DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
+ NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
+ DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn);
when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
.thenReturn(mockTx);
when(mockTx.readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name")))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+ .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
- List<Struct> result = fnTester.processBundle(1);
- assertThat(result, Matchers.<Struct>iterableWithSize(2));
+ List<Struct> result = fnTester.processBundle(read.getReadOperation());
+ assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray()));
verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
verify(mockTx).readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name"));
@@ -233,30 +237,32 @@ public class SpannerIOReadTest implements Serializable {
public void readPipeline() throws Exception {
Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
- PCollectionView<Transaction> tx = pipeline
- .apply("tx", SpannerIO.createTransaction()
+ SpannerConfig spannerConfig =
+ SpannerConfig.create()
.withProjectId("test")
.withInstanceId("123")
.withDatabaseId("aaa")
- .withServiceFactory(serviceFactory));
-
- PCollection<Struct> one = pipeline.apply("read q", SpannerIO.read()
- .withProjectId("test")
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withQuery("SELECT * FROM users")
- .withServiceFactory(serviceFactory)
- .withTransaction(tx));
- PCollection<Struct> two = pipeline.apply("read r", SpannerIO.read()
- .withProjectId("test")
- .withInstanceId("123")
- .withDatabaseId("aaa")
- .withTimestamp(Timestamp.now())
- .withTable("users")
- .withColumns("id", "name")
- .withServiceFactory(serviceFactory)
- .withTransaction(tx));
+ .withServiceFactory(serviceFactory);
+
+ PCollectionView<Transaction> tx =
+ pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
+
+ PCollection<Struct> one =
+ pipeline.apply(
+ "read q",
+ SpannerIO.read()
+ .withSpannerConfig(spannerConfig)
+ .withQuery("SELECT * FROM users")
+ .withTransaction(tx));
+ PCollection<Struct> two =
+ pipeline.apply(
+ "read r",
+ SpannerIO.read()
+ .withSpannerConfig(spannerConfig)
+ .withTimestamp(Timestamp.now())
+ .withTable("users")
+ .withColumns("id", "name")
+ .withTransaction(tx));
when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
.thenReturn(mockTx);
@@ -265,13 +271,58 @@ public class SpannerIOReadTest implements Serializable {
Collections.<Struct>emptyList()));
when(mockTx.executeQuery(Statement.of("SELECT * FROM users")))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+ .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
+ when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
+ .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
+ when(mockTx.getReadTimestamp()).thenReturn(timestamp);
+
+ PAssert.that(one).containsInAnyOrder(FAKE_ROWS);
+ PAssert.that(two).containsInAnyOrder(FAKE_ROWS);
+
+ pipeline.run();
+
+ verify(serviceFactory.mockDatabaseClient(), times(2))
+ .readOnlyTransaction(TimestampBound.ofReadTimestamp(timestamp));
+ }
+
+ @Test
+ @Category(NeedsRunner.class)
+ public void readAllPipeline() throws Exception {
+ Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
+
+ SpannerConfig spannerConfig =
+ SpannerConfig.create()
+ .withProjectId("test")
+ .withInstanceId("123")
+ .withDatabaseId("aaa")
+ .withServiceFactory(serviceFactory);
+
+ PCollectionView<Transaction> tx =
+ pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
+
+ PCollection<ReadOperation> reads =
+ pipeline.apply(
+ Create.of(
+ ReadOperation.create().withQuery("SELECT * FROM users"),
+ ReadOperation.create().withTable("users").withColumns("id", "name")));
+
+ PCollection<Struct> one =
+ reads.apply(
+ "read all", SpannerIO.readAll().withSpannerConfig(spannerConfig).withTransaction(tx));
+
+ when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
+ .thenReturn(mockTx);
+
+ when(mockTx.executeQuery(Statement.of("SELECT 1")))
+ .thenReturn(ResultSets.forRows(Type.struct(), Collections.<Struct>emptyList()));
+
+ when(mockTx.executeQuery(Statement.of("SELECT * FROM users")))
+ .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)));
when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
- .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+ .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)));
when(mockTx.getReadTimestamp()).thenReturn(timestamp);
- PAssert.that(one).containsInAnyOrder(fakeRows);
- PAssert.that(two).containsInAnyOrder(fakeRows);
+ PAssert.that(one).containsInAnyOrder(FAKE_ROWS);
pipeline.run();
[2/2] beam git commit: This closes #3591: [BEAM-1542] Introduced
SpannerIO.readAll
Posted by jk...@apache.org.
This closes #3591: [BEAM-1542] Introduced SpannerIO.readAll
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/afeba371
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/afeba371
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/afeba371
Branch: refs/heads/master
Commit: afeba3715c806b53115f8f7994eb7bc207c68932
Parents: c8e3744 95e9c28
Author: Eugene Kirpichov <ki...@google.com>
Authored: Thu Jul 20 10:59:14 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 10:59:14 2017 -0700
----------------------------------------------------------------------
.../sdk/io/gcp/spanner/NaiveSpannerReadFn.java | 35 ++--
.../beam/sdk/io/gcp/spanner/ReadOperation.java | 96 ++++++++++
.../beam/sdk/io/gcp/spanner/SpannerIO.java | 187 ++++++++++++++-----
.../sdk/io/gcp/spanner/SpannerIOReadTest.java | 145 +++++++++-----
4 files changed, 353 insertions(+), 110 deletions(-)
----------------------------------------------------------------------