You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/20 19:53:44 UTC

[48/50] [abbrv] beam git commit: Introduces SpannerIO.readAll()

Introduces SpannerIO.readAll()


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/95e9c28c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/95e9c28c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/95e9c28c

Branch: refs/heads/DSL_SQL
Commit: 95e9c28ca4da5bac31f3d768595693e43b464c1c
Parents: c8e3744
Author: Mairbek Khadikov <ma...@google.com>
Authored: Tue Jul 18 16:23:58 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Jul 20 10:58:51 2017 -0700

----------------------------------------------------------------------
 .../sdk/io/gcp/spanner/NaiveSpannerReadFn.java  |  35 ++--
 .../beam/sdk/io/gcp/spanner/ReadOperation.java  |  96 ++++++++++
 .../beam/sdk/io/gcp/spanner/SpannerIO.java      | 187 ++++++++++++++-----
 .../sdk/io/gcp/spanner/SpannerIOReadTest.java   | 145 +++++++++-----
 4 files changed, 353 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
index d193b95..92b3fe3 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/NaiveSpannerReadFn.java
@@ -22,44 +22,53 @@ import com.google.cloud.spanner.ResultSet;
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TimestampBound;
 import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.values.PCollectionView;
 
 /** A simplest read function implementation. Parallelism support is coming. */
 @VisibleForTesting
-class NaiveSpannerReadFn extends AbstractSpannerFn<Object, Struct> {
-  private final SpannerIO.Read config;
+class NaiveSpannerReadFn extends AbstractSpannerFn<ReadOperation, Struct> {
+  private final SpannerConfig config;
+  @Nullable private final PCollectionView<Transaction> transaction;
 
-  NaiveSpannerReadFn(SpannerIO.Read config) {
+  NaiveSpannerReadFn(SpannerConfig config, @Nullable PCollectionView<Transaction> transaction) {
     this.config = config;
+    this.transaction = transaction;
+  }
+
+  NaiveSpannerReadFn(SpannerConfig config) {
+    this(config, null);
   }
 
   SpannerConfig getSpannerConfig() {
-    return config.getSpannerConfig();
+    return config;
   }
 
   @ProcessElement
   public void processElement(ProcessContext c) throws Exception {
     TimestampBound timestampBound = TimestampBound.strong();
-    if (config.getTransaction() != null) {
-      Transaction transaction = c.sideInput(config.getTransaction());
+    if (transaction != null) {
+      Transaction transaction = c.sideInput(this.transaction);
       timestampBound = TimestampBound.ofReadTimestamp(transaction.timestamp());
     }
+    ReadOperation op = c.element();
     try (ReadOnlyTransaction readOnlyTransaction =
         databaseClient().readOnlyTransaction(timestampBound)) {
-      ResultSet resultSet = execute(readOnlyTransaction);
+      ResultSet resultSet = execute(op, readOnlyTransaction);
       while (resultSet.next()) {
         c.output(resultSet.getCurrentRowAsStruct());
       }
     }
   }
 
-  private ResultSet execute(ReadOnlyTransaction readOnlyTransaction) {
-    if (config.getQuery() != null) {
-      return readOnlyTransaction.executeQuery(config.getQuery());
+  private ResultSet execute(ReadOperation op, ReadOnlyTransaction readOnlyTransaction) {
+    if (op.getQuery() != null) {
+      return readOnlyTransaction.executeQuery(op.getQuery());
     }
-    if (config.getIndex() != null) {
+    if (op.getIndex() != null) {
       return readOnlyTransaction.readUsingIndex(
-          config.getTable(), config.getIndex(), config.getKeySet(), config.getColumns());
+          op.getTable(), op.getIndex(), op.getKeySet(), op.getColumns());
     }
-    return readOnlyTransaction.read(config.getTable(), config.getKeySet(), config.getColumns());
+    return readOnlyTransaction.read(op.getTable(), op.getKeySet(), op.getColumns());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
new file mode 100644
index 0000000..3b2bb6b
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/ReadOperation.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.spanner;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.spanner.KeySet;
+import com.google.cloud.spanner.Statement;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import javax.annotation.Nullable;
+
+/** Encapsulates a spanner read operation. */
+@AutoValue
+public abstract class ReadOperation implements Serializable {
+
+  public static ReadOperation create() {
+    return new AutoValue_ReadOperation.Builder().setKeySet(KeySet.all()).build();
+  }
+
+  @Nullable
+  public abstract Statement getQuery();
+
+  @Nullable
+  public abstract String getTable();
+
+  @Nullable
+  public abstract String getIndex();
+
+  @Nullable
+  public abstract List<String> getColumns();
+
+  @Nullable
+  public abstract KeySet getKeySet();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+
+    abstract Builder setQuery(Statement statement);
+
+    abstract Builder setTable(String table);
+
+    abstract Builder setIndex(String index);
+
+    abstract Builder setColumns(List<String> columns);
+
+    abstract Builder setKeySet(KeySet keySet);
+
+    abstract ReadOperation build();
+  }
+
+  abstract Builder toBuilder();
+
+  public ReadOperation withTable(String table) {
+    return toBuilder().setTable(table).build();
+  }
+
+  public ReadOperation withColumns(String... columns) {
+    return withColumns(Arrays.asList(columns));
+  }
+
+  public ReadOperation withColumns(List<String> columns) {
+    return toBuilder().setColumns(columns).build();
+  }
+
+  public ReadOperation withQuery(Statement statement) {
+    return toBuilder().setQuery(statement).build();
+  }
+
+  public ReadOperation withQuery(String sql) {
+    return withQuery(Statement.of(sql));
+  }
+
+  public ReadOperation withKeySet(KeySet keySet) {
+    return toBuilder().setKeySet(keySet).build();
+  }
+
+  public ReadOperation withIndex(String index) {
+    return toBuilder().setIndex(index).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
index a247d4c..e5c9c05 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java
@@ -31,12 +31,11 @@ import com.google.cloud.spanner.Statement;
 import com.google.cloud.spanner.Struct;
 import com.google.cloud.spanner.TimestampBound;
 import com.google.common.annotations.VisibleForTesting;
-
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import javax.annotation.Nullable;
-
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
@@ -44,7 +43,11 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -172,7 +175,18 @@ public class SpannerIO {
     return new AutoValue_SpannerIO_Read.Builder()
         .setSpannerConfig(SpannerConfig.create())
         .setTimestampBound(TimestampBound.strong())
-        .setKeySet(KeySet.all())
+        .setReadOperation(ReadOperation.create())
+        .build();
+  }
+
+  /**
+   * A {@link PTransform} that works like {@link #read}, but executes read operations coming from a
+   * {@link PCollection}.
+   */
+  @Experimental(Experimental.Kind.SOURCE_SINK)
+  public static ReadAll readAll() {
+    return new AutoValue_SpannerIO_ReadAll.Builder()
+        .setSpannerConfig(SpannerConfig.create())
         .build();
   }
 
@@ -202,34 +216,113 @@ public class SpannerIO {
         .build();
   }
 
-  /**
-   * A {@link PTransform} that reads data from Google Cloud Spanner.
-   *
-   * @see SpannerIO
-   */
+  /** Implementation of {@link #readAll}. */
   @Experimental(Experimental.Kind.SOURCE_SINK)
   @AutoValue
-  public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> {
+  public abstract static class ReadAll
+      extends PTransform<PCollection<ReadOperation>, PCollection<Struct>> {
 
     abstract SpannerConfig getSpannerConfig();
 
     @Nullable
-    abstract TimestampBound getTimestampBound();
+    abstract PCollectionView<Transaction> getTransaction();
 
-    @Nullable
-    abstract Statement getQuery();
+    abstract Builder toBuilder();
 
-    @Nullable
-    abstract String getTable();
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
 
-    @Nullable
-    abstract String getIndex();
+      abstract Builder setTransaction(PCollectionView<Transaction> transaction);
 
-    @Nullable
-    abstract List<String> getColumns();
+      abstract ReadAll build();
+    }
+
+    /** Specifies the Cloud Spanner configuration. */
+    public ReadAll withSpannerConfig(SpannerConfig spannerConfig) {
+      return toBuilder().setSpannerConfig(spannerConfig).build();
+    }
+
+    /** Specifies the Cloud Spanner project. */
+    public ReadAll withProjectId(String projectId) {
+      return withProjectId(ValueProvider.StaticValueProvider.of(projectId));
+    }
+
+    /** Specifies the Cloud Spanner project. */
+    public ReadAll withProjectId(ValueProvider<String> projectId) {
+      SpannerConfig config = getSpannerConfig();
+      return withSpannerConfig(config.withProjectId(projectId));
+    }
+
+    /** Specifies the Cloud Spanner instance. */
+    public ReadAll withInstanceId(String instanceId) {
+      return withInstanceId(ValueProvider.StaticValueProvider.of(instanceId));
+    }
+
+    /** Specifies the Cloud Spanner instance. */
+    public ReadAll withInstanceId(ValueProvider<String> instanceId) {
+      SpannerConfig config = getSpannerConfig();
+      return withSpannerConfig(config.withInstanceId(instanceId));
+    }
+
+    /** Specifies the Cloud Spanner database. */
+    public ReadAll withDatabaseId(String databaseId) {
+      return withDatabaseId(ValueProvider.StaticValueProvider.of(databaseId));
+    }
+
+    /** Specifies the Cloud Spanner database. */
+    public ReadAll withDatabaseId(ValueProvider<String> databaseId) {
+      SpannerConfig config = getSpannerConfig();
+      return withSpannerConfig(config.withDatabaseId(databaseId));
+    }
+
+    @VisibleForTesting
+    ReadAll withServiceFactory(ServiceFactory<Spanner, SpannerOptions> serviceFactory) {
+      SpannerConfig config = getSpannerConfig();
+      return withSpannerConfig(config.withServiceFactory(serviceFactory));
+    }
+
+    public ReadAll withTransaction(PCollectionView<Transaction> transaction) {
+      return toBuilder().setTransaction(transaction).build();
+    }
+
+    @Override
+    public PCollection<Struct> expand(PCollection<ReadOperation> input) {
+      PCollection<ReadOperation> reshuffled =
+          input
+              .apply(
+                  "Pair with random key",
+                  WithKeys.of(
+                      new SerializableFunction<ReadOperation, String>() {
+                        @Override
+                        public String apply(ReadOperation ignored) {
+                          return UUID.randomUUID().toString();
+                        }
+                      }))
+              .apply("Reshuffle", Reshuffle.<String, ReadOperation>of())
+              .apply("Strip keys", Values.<ReadOperation>create());
+      List<PCollectionView<Transaction>> sideInputs =
+          getTransaction() == null
+              ? Collections.<PCollectionView<Transaction>>emptyList()
+              : Collections.singletonList(getTransaction());
+      return reshuffled.apply(
+          "Execute queries",
+          ParDo.of(new NaiveSpannerReadFn(getSpannerConfig(), getTransaction()))
+              .withSideInputs(sideInputs));
+    }
+  }
+
+  /** Implementation of {@link #read}. */
+  @Experimental(Experimental.Kind.SOURCE_SINK)
+  @AutoValue
+  public abstract static class Read extends PTransform<PBegin, PCollection<Struct>> {
+
+    abstract SpannerConfig getSpannerConfig();
+
+    abstract ReadOperation getReadOperation();
 
     @Nullable
-    abstract KeySet getKeySet();
+    abstract TimestampBound getTimestampBound();
 
     @Nullable
     abstract PCollectionView<Transaction> getTransaction();
@@ -241,17 +334,9 @@ public class SpannerIO {
 
       abstract Builder setSpannerConfig(SpannerConfig spannerConfig);
 
-      abstract Builder setTimestampBound(TimestampBound timestampBound);
-
-      abstract Builder setQuery(Statement statement);
-
-      abstract Builder setTable(String table);
-
-      abstract Builder setIndex(String index);
+      abstract Builder setReadOperation(ReadOperation readOperation);
 
-      abstract Builder setColumns(List<String> columns);
-
-      abstract Builder setKeySet(KeySet keySet);
+      abstract Builder setTimestampBound(TimestampBound timestampBound);
 
       abstract Builder setTransaction(PCollectionView<Transaction> transaction);
 
@@ -315,7 +400,11 @@ public class SpannerIO {
     }
 
     public Read withTable(String table) {
-      return toBuilder().setTable(table).build();
+      return withReadOperation(getReadOperation().withTable(table));
+    }
+
+    public Read withReadOperation(ReadOperation operation) {
+      return toBuilder().setReadOperation(operation).build();
     }
 
     public Read withColumns(String... columns) {
@@ -323,11 +412,11 @@ public class SpannerIO {
     }
 
     public Read withColumns(List<String> columns) {
-      return toBuilder().setColumns(columns).build();
+      return withReadOperation(getReadOperation().withColumns(columns));
     }
 
     public Read withQuery(Statement statement) {
-      return toBuilder().setQuery(statement).build();
+      return withReadOperation(getReadOperation().withQuery(statement));
     }
 
     public Read withQuery(String sql) {
@@ -335,14 +424,13 @@ public class SpannerIO {
     }
 
     public Read withKeySet(KeySet keySet) {
-      return toBuilder().setKeySet(keySet).build();
+      return withReadOperation(getReadOperation().withKeySet(keySet));
     }
 
     public Read withIndex(String index) {
-      return toBuilder().setIndex(index).build();
+      return withReadOperation(getReadOperation().withIndex(index));
     }
 
-
     @Override
     public void validate(PipelineOptions options) {
       getSpannerConfig().validate(options);
@@ -351,16 +439,16 @@ public class SpannerIO {
           "SpannerIO.read() runs in a read only transaction and requires timestamp to be set "
               + "with withTimestampBound or withTimestamp method");
 
-      if (getQuery() != null) {
+      if (getReadOperation().getQuery() != null) {
         // TODO: validate query?
-      } else if (getTable() != null) {
+      } else if (getReadOperation().getTable() != null) {
         // Assume read
         checkNotNull(
-            getColumns(),
+            getReadOperation().getColumns(),
             "For a read operation SpannerIO.read() requires a list of "
                 + "columns to set with withColumns method");
         checkArgument(
-            !getColumns().isEmpty(),
+            !getReadOperation().getColumns().isEmpty(),
             "For a read operation SpannerIO.read() requires a"
                 + " list of columns to set with withColumns method");
       } else {
@@ -371,18 +459,17 @@ public class SpannerIO {
 
     @Override
     public PCollection<Struct> expand(PBegin input) {
-      Read config = this;
-      List<PCollectionView<Transaction>> sideInputs = Collections.emptyList();
-      if (getTimestampBound() != null) {
-        PCollectionView<Transaction> transaction =
-            input.apply(createTransaction().withSpannerConfig(getSpannerConfig()));
-        config = config.withTransaction(transaction);
-        sideInputs = Collections.singletonList(transaction);
+      PCollectionView<Transaction> transaction = getTransaction();
+      if (transaction == null && getTimestampBound() != null) {
+        transaction =
+            input.apply(
+                createTransaction()
+                    .withTimestampBound(getTimestampBound())
+                    .withSpannerConfig(getSpannerConfig()));
       }
-      return input
-          .apply(Create.of(1))
-          .apply(
-              "Execute query", ParDo.of(new NaiveSpannerReadFn(config)).withSideInputs(sideInputs));
+      ReadAll readAll =
+          readAll().withSpannerConfig(getSpannerConfig()).withTransaction(transaction);
+      return input.apply(Create.of(getReadOperation())).apply("Execute query", readAll);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/95e9c28c/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
index 5ba2da0..6eb1a33 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOReadTest.java
@@ -39,6 +39,7 @@ import java.util.List;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -55,6 +56,7 @@ import org.mockito.Mockito;
 /** Unit tests for {@link SpannerIO}. */
 @RunWith(JUnit4.class)
 public class SpannerIOReadTest implements Serializable {
+
   @Rule
   public final transient TestPipeline pipeline = TestPipeline.create();
   @Rule
@@ -63,12 +65,16 @@ public class SpannerIOReadTest implements Serializable {
   private FakeServiceFactory serviceFactory;
   private ReadOnlyTransaction mockTx;
 
-  private Type fakeType = Type.struct(Type.StructField.of("id", Type.int64()),
-      Type.StructField.of("name", Type.string()));
+  private static final Type FAKE_TYPE =
+      Type.struct(
+          Type.StructField.of("id", Type.int64()), Type.StructField.of("name", Type.string()));
 
-  private List<Struct> fakeRows = Arrays.asList(
-      Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(),
-      Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build());
+  private static final List<Struct> FAKE_ROWS =
+      Arrays.asList(
+          Struct.newBuilder().add("id", Value.int64(1)).add("name", Value.string("Alice")).build(),
+          Struct.newBuilder().add("id", Value.int64(2)).add("name", Value.string("Bob")).build(),
+          Struct.newBuilder().add("id", Value.int64(3)).add("name", Value.string("Carl")).build(),
+          Struct.newBuilder().add("id", Value.int64(4)).add("name", Value.string("Dan")).build());
 
   @Before
   @SuppressWarnings("unchecked")
@@ -153,20 +159,19 @@ public class SpannerIOReadTest implements Serializable {
             .withProjectId("test")
             .withInstanceId("123")
             .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
             .withQuery("SELECT * FROM users")
             .withServiceFactory(serviceFactory);
 
-    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
-    DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
+    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
+    DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn);
 
     when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
         .thenReturn(mockTx);
     when(mockTx.executeQuery(any(Statement.class)))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+        .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
 
-    List<Struct> result = fnTester.processBundle(1);
-    assertThat(result, Matchers.<Struct>iterableWithSize(2));
+    List<Struct> result = fnTester.processBundle(read.getReadOperation());
+    assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray()));
 
     verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound
         .strong());
@@ -180,21 +185,20 @@ public class SpannerIOReadTest implements Serializable {
             .withProjectId("test")
             .withInstanceId("123")
             .withDatabaseId("aaa")
-            .withTimestamp(Timestamp.now())
             .withTable("users")
             .withColumns("id", "name")
             .withServiceFactory(serviceFactory);
 
-    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
-    DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
+    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
+    DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn);
 
     when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
         .thenReturn(mockTx);
     when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+        .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
 
-    List<Struct> result = fnTester.processBundle(1);
-    assertThat(result, Matchers.<Struct>iterableWithSize(2));
+    List<Struct> result = fnTester.processBundle(read.getReadOperation());
+    assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray()));
 
     verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
     verify(mockTx).read("users", KeySet.all(), Arrays.asList("id", "name"));
@@ -213,16 +217,16 @@ public class SpannerIOReadTest implements Serializable {
             .withIndex("theindex")
             .withServiceFactory(serviceFactory);
 
-    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read);
-    DoFnTester<Object, Struct> fnTester = DoFnTester.of(readFn);
+    NaiveSpannerReadFn readFn = new NaiveSpannerReadFn(read.getSpannerConfig());
+    DoFnTester<ReadOperation, Struct> fnTester = DoFnTester.of(readFn);
 
     when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
         .thenReturn(mockTx);
     when(mockTx.readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name")))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+        .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
 
-    List<Struct> result = fnTester.processBundle(1);
-    assertThat(result, Matchers.<Struct>iterableWithSize(2));
+    List<Struct> result = fnTester.processBundle(read.getReadOperation());
+    assertThat(result, Matchers.containsInAnyOrder(FAKE_ROWS.toArray()));
 
     verify(serviceFactory.mockDatabaseClient()).readOnlyTransaction(TimestampBound.strong());
     verify(mockTx).readUsingIndex("users", "theindex", KeySet.all(), Arrays.asList("id", "name"));
@@ -233,30 +237,32 @@ public class SpannerIOReadTest implements Serializable {
   public void readPipeline() throws Exception {
     Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
 
-    PCollectionView<Transaction> tx = pipeline
-        .apply("tx", SpannerIO.createTransaction()
+    SpannerConfig spannerConfig =
+        SpannerConfig.create()
             .withProjectId("test")
             .withInstanceId("123")
             .withDatabaseId("aaa")
-            .withServiceFactory(serviceFactory));
-
-    PCollection<Struct> one = pipeline.apply("read q", SpannerIO.read()
-        .withProjectId("test")
-        .withInstanceId("123")
-        .withDatabaseId("aaa")
-        .withTimestamp(Timestamp.now())
-        .withQuery("SELECT * FROM users")
-        .withServiceFactory(serviceFactory)
-        .withTransaction(tx));
-    PCollection<Struct> two = pipeline.apply("read r", SpannerIO.read()
-        .withProjectId("test")
-        .withInstanceId("123")
-        .withDatabaseId("aaa")
-        .withTimestamp(Timestamp.now())
-        .withTable("users")
-        .withColumns("id", "name")
-        .withServiceFactory(serviceFactory)
-        .withTransaction(tx));
+            .withServiceFactory(serviceFactory);
+
+    PCollectionView<Transaction> tx =
+        pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
+
+    PCollection<Struct> one =
+        pipeline.apply(
+            "read q",
+            SpannerIO.read()
+                .withSpannerConfig(spannerConfig)
+                .withQuery("SELECT * FROM users")
+                .withTransaction(tx));
+    PCollection<Struct> two =
+        pipeline.apply(
+            "read r",
+            SpannerIO.read()
+                .withSpannerConfig(spannerConfig)
+                .withTimestamp(Timestamp.now())
+                .withTable("users")
+                .withColumns("id", "name")
+                .withTransaction(tx));
 
     when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
         .thenReturn(mockTx);
@@ -265,13 +271,58 @@ public class SpannerIOReadTest implements Serializable {
         Collections.<Struct>emptyList()));
 
     when(mockTx.executeQuery(Statement.of("SELECT * FROM users")))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+        .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
+    when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
+        .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS));
+    when(mockTx.getReadTimestamp()).thenReturn(timestamp);
+
+    PAssert.that(one).containsInAnyOrder(FAKE_ROWS);
+    PAssert.that(two).containsInAnyOrder(FAKE_ROWS);
+
+    pipeline.run();
+
+    verify(serviceFactory.mockDatabaseClient(), times(2))
+        .readOnlyTransaction(TimestampBound.ofReadTimestamp(timestamp));
+  }
+
+  @Test
+  @Category(NeedsRunner.class)
+  public void readAllPipeline() throws Exception {
+    Timestamp timestamp = Timestamp.ofTimeMicroseconds(12345);
+
+    SpannerConfig spannerConfig =
+        SpannerConfig.create()
+            .withProjectId("test")
+            .withInstanceId("123")
+            .withDatabaseId("aaa")
+            .withServiceFactory(serviceFactory);
+
+    PCollectionView<Transaction> tx =
+        pipeline.apply("tx", SpannerIO.createTransaction().withSpannerConfig(spannerConfig));
+
+    PCollection<ReadOperation> reads =
+        pipeline.apply(
+            Create.of(
+                ReadOperation.create().withQuery("SELECT * FROM users"),
+                ReadOperation.create().withTable("users").withColumns("id", "name")));
+
+    PCollection<Struct> one =
+        reads.apply(
+            "read all", SpannerIO.readAll().withSpannerConfig(spannerConfig).withTransaction(tx));
+
+    when(serviceFactory.mockDatabaseClient().readOnlyTransaction(any(TimestampBound.class)))
+        .thenReturn(mockTx);
+
+    when(mockTx.executeQuery(Statement.of("SELECT 1")))
+        .thenReturn(ResultSets.forRows(Type.struct(), Collections.<Struct>emptyList()));
+
+    when(mockTx.executeQuery(Statement.of("SELECT * FROM users")))
+        .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(0, 2)));
     when(mockTx.read("users", KeySet.all(), Arrays.asList("id", "name")))
-        .thenReturn(ResultSets.forRows(fakeType, fakeRows));
+        .thenReturn(ResultSets.forRows(FAKE_TYPE, FAKE_ROWS.subList(2, 4)));
     when(mockTx.getReadTimestamp()).thenReturn(timestamp);
 
-    PAssert.that(one).containsInAnyOrder(fakeRows);
-    PAssert.that(two).containsInAnyOrder(fakeRows);
+    PAssert.that(one).containsInAnyOrder(FAKE_ROWS);
 
     pipeline.run();