You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/07/27 01:16:12 UTC

[beam] branch master updated: [#22051]: Add read_time support to Google Cloud Datastore connector (#22052)

This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new edd915d6956 [#22051]: Add read_time support to Google Cloud Datastore connector (#22052)
edd915d6956 is described below

commit edd915d6956fe6e24ac37dd9255fd20e15e93920
Author: yixiaoshen <yi...@google.com>
AuthorDate: Tue Jul 26 18:16:05 2022 -0700

    [#22051]: Add read_time support to Google Cloud Datastore connector (#22052)
    
    * [#22051]: add read_time support in Google Cloud Datastore connector.
    
    * Re-arrange @Nullable annotation position.
---
 .../org/apache/beam/gradle/BeamModulePlugin.groovy |    4 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java     |  167 ++-
 .../beam/sdk/io/gcp/datastore/DatastoreV1Test.java | 1531 +++++++++++---------
 .../beam/sdk/io/gcp/datastore/SplitQueryFnIT.java  |   19 +-
 .../apache/beam/sdk/io/gcp/datastore/V1ReadIT.java |   58 +-
 5 files changed, 1023 insertions(+), 756 deletions(-)

diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index c7d8a55145b..2e5cf0eccf2 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -592,7 +592,7 @@ class BeamModulePlugin implements Plugin<Project> {
         google_cloud_core_grpc                      : "com.google.cloud:google-cloud-core-grpc", // google_cloud_platform_libraries_bom sets version
         google_cloud_datacatalog_v1beta1            : "com.google.cloud:google-cloud-datacatalog", // google_cloud_platform_libraries_bom sets version
         google_cloud_dataflow_java_proto_library_all: "com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304",
-        google_cloud_datastore_v1_proto_client      : "com.google.cloud.datastore:datastore-v1-proto-client:2.2.10",
+        google_cloud_datastore_v1_proto_client      : "com.google.cloud.datastore:datastore-v1-proto-client:2.9.0",
         google_cloud_firestore                      : "com.google.cloud:google-cloud-firestore", // google_cloud_platform_libraries_bom sets version
         google_cloud_pubsub                         : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version
         google_cloud_pubsublite                     : "com.google.cloud:google-cloud-pubsublite",  // google_cloud_platform_libraries_bom sets version
@@ -687,7 +687,7 @@ class BeamModulePlugin implements Plugin<Project> {
         proto_google_cloud_bigtable_admin_v2        : "com.google.api.grpc:proto-google-cloud-bigtable-admin-v2", // google_cloud_platform_libraries_bom sets version
         proto_google_cloud_bigtable_v2              : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version
         proto_google_cloud_datacatalog_v1beta1      : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version
-        proto_google_cloud_datastore_v1             : "com.google.api.grpc:proto-google-cloud-datastore-v1:0.93.10", // google_cloud_platform_libraries_bom sets version
+        proto_google_cloud_datastore_v1             : "com.google.api.grpc:proto-google-cloud-datastore-v1:0.100.0", // google_cloud_platform_libraries_bom sets version
         proto_google_cloud_firestore_v1             : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsub_v1                : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
         proto_google_cloud_pubsublite_v1            : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 9137335943c..06b7c5292fd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -45,6 +45,7 @@ import com.google.datastore.v1.Mutation;
 import com.google.datastore.v1.PartitionId;
 import com.google.datastore.v1.Query;
 import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.ReadOptions;
 import com.google.datastore.v1.RunQueryRequest;
 import com.google.datastore.v1.RunQueryResponse;
 import com.google.datastore.v1.client.Datastore;
@@ -54,6 +55,8 @@ import com.google.datastore.v1.client.DatastoreHelper;
 import com.google.datastore.v1.client.DatastoreOptions;
 import com.google.datastore.v1.client.QuerySplitter;
 import com.google.protobuf.Int32Value;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.util.Timestamps;
 import com.google.rpc.Code;
 import java.io.IOException;
 import java.io.Serializable;
@@ -320,6 +323,8 @@ public class DatastoreV1 {
 
     public abstract @Nullable String getLocalhost();
 
+    public abstract @Nullable Instant getReadTime();
+
     @Override
     public abstract String toString();
 
@@ -339,6 +344,8 @@ public class DatastoreV1 {
 
       abstract Builder setLocalhost(String localhost);
 
+      abstract Builder setReadTime(Instant readTime);
+
       abstract Read build();
     }
 
@@ -346,10 +353,11 @@ public class DatastoreV1 {
      * Computes the number of splits to be performed on the given query by querying the estimated
      * size from Cloud Datastore.
      */
-    static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
+    static int getEstimatedNumSplits(
+        Datastore datastore, Query query, @Nullable String namespace, @Nullable Instant readTime) {
       int numSplits;
       try {
-        long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
+        long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace, readTime);
         LOG.info("Estimated size bytes for the query is: {}", estimatedSizeBytes);
         numSplits =
             (int)
@@ -370,7 +378,8 @@ public class DatastoreV1 {
      * table.
      */
     private static long queryLatestStatisticsTimestamp(
-        Datastore datastore, @Nullable String namespace) throws DatastoreException {
+        Datastore datastore, @Nullable String namespace, @Nullable Instant readTime)
+        throws DatastoreException {
       Query.Builder query = Query.newBuilder();
       // Note: namespace either being null or empty represents the default namespace, in which
       // case we treat it as not provided by the user.
@@ -381,7 +390,7 @@ public class DatastoreV1 {
       }
       query.addOrder(makeOrder("timestamp", DESCENDING));
       query.setLimit(Int32Value.newBuilder().setValue(1));
-      RunQueryRequest request = makeRequest(query.build(), namespace);
+      RunQueryRequest request = makeRequest(query.build(), namespace, readTime);
 
       RunQueryResponse response = datastore.runQuery(request);
       QueryResultBatch batch = response.getBatch();
@@ -392,10 +401,14 @@ public class DatastoreV1 {
       return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000;
     }
 
-    /** Retrieve latest table statistics for a given kind, namespace, and datastore. */
+    /**
+     * Retrieve latest table statistics for a given kind, namespace, and datastore. If the Read has
+     * readTime specified, the latest statistics at or before readTime is retrieved.
+     */
     private static Entity getLatestTableStats(
-        String ourKind, @Nullable String namespace, Datastore datastore) throws DatastoreException {
-      long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace);
+        String ourKind, @Nullable String namespace, Datastore datastore, @Nullable Instant readTime)
+        throws DatastoreException {
+      long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace, readTime);
       LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp);
 
       Query.Builder queryBuilder = Query.newBuilder();
@@ -410,7 +423,7 @@ public class DatastoreV1 {
               makeFilter("kind_name", EQUAL, makeValue(ourKind).build()).build(),
               makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()).build()));
 
-      RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+      RunQueryRequest request = makeRequest(queryBuilder.build(), namespace, readTime);
 
       long now = System.currentTimeMillis();
       RunQueryResponse response = datastore.runQuery(request);
@@ -433,10 +446,11 @@ public class DatastoreV1 {
      *
      * <p>See https://cloud.google.com/datastore/docs/concepts/stats.
      */
-    static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
+    static long getEstimatedSizeBytes(
+        Datastore datastore, Query query, @Nullable String namespace, @Nullable Instant readTime)
         throws DatastoreException {
       String ourKind = query.getKind(0).getName();
-      Entity entity = getLatestTableStats(ourKind, namespace, datastore);
+      Entity entity = getLatestTableStats(ourKind, namespace, datastore, readTime);
       return entity.getProperties().get("entity_bytes").getIntegerValue();
     }
 
@@ -451,21 +465,38 @@ public class DatastoreV1 {
       return partitionBuilder;
     }
 
-    /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */
-    static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
-      return RunQueryRequest.newBuilder()
-          .setQuery(query)
-          .setPartitionId(forNamespace(namespace))
-          .build();
+    /**
+     * Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}, optionally at
+     * the requested {@code readTime}.
+     */
+    static RunQueryRequest makeRequest(
+        Query query, @Nullable String namespace, @Nullable Instant readTime) {
+      RunQueryRequest.Builder request =
+          RunQueryRequest.newBuilder().setQuery(query).setPartitionId(forNamespace(namespace));
+      if (readTime != null) {
+        Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+        request.setReadOptions(ReadOptions.newBuilder().setReadTime(readTimeProto).build());
+      }
+      return request.build();
     }
 
     @VisibleForTesting
-    /** Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}. */
-    static RunQueryRequest makeRequest(GqlQuery gqlQuery, @Nullable String namespace) {
-      return RunQueryRequest.newBuilder()
-          .setGqlQuery(gqlQuery)
-          .setPartitionId(forNamespace(namespace))
-          .build();
+    /**
+     * Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}, optionally
+     * at the requested {@code readTime}.
+     */
+    static RunQueryRequest makeRequest(
+        GqlQuery gqlQuery, @Nullable String namespace, @Nullable Instant readTime) {
+      RunQueryRequest.Builder request =
+          RunQueryRequest.newBuilder()
+              .setGqlQuery(gqlQuery)
+              .setPartitionId(forNamespace(namespace));
+      if (readTime != null) {
+        Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+        request.setReadOptions(ReadOptions.newBuilder().setReadTime(readTimeProto).build());
+      }
+
+      return request.build();
     }
 
     /**
@@ -477,10 +508,16 @@ public class DatastoreV1 {
         @Nullable String namespace,
         Datastore datastore,
         QuerySplitter querySplitter,
-        int numSplits)
+        int numSplits,
+        @Nullable Instant readTime)
         throws DatastoreException {
       // If namespace is set, include it in the split request so splits are calculated accordingly.
-      return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore);
+      PartitionId partitionId = forNamespace(namespace).build();
+      if (readTime != null) {
+        Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+        return querySplitter.getSplits(query, partitionId, numSplits, datastore, readTimeProto);
+      }
+      return querySplitter.getSplits(query, partitionId, numSplits, datastore);
     }
 
     /**
@@ -497,11 +534,13 @@ public class DatastoreV1 {
      * problem in practice.
      */
     @VisibleForTesting
-    static Query translateGqlQueryWithLimitCheck(String gql, Datastore datastore, String namespace)
+    static Query translateGqlQueryWithLimitCheck(
+        String gql, Datastore datastore, String namespace, @Nullable Instant readTime)
         throws DatastoreException {
       String gqlQueryWithZeroLimit = gql + " LIMIT 0";
       try {
-        Query translatedQuery = translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace);
+        Query translatedQuery =
+            translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace, readTime);
         // Clear the limit that we set.
         return translatedQuery.toBuilder().clearLimit().build();
       } catch (DatastoreException e) {
@@ -512,7 +551,7 @@ public class DatastoreV1 {
           LOG.warn("Failed to translate Gql query '{}': {}", gqlQueryWithZeroLimit, e.getMessage());
           LOG.warn("User query might have a limit already set, so trying without zero limit");
           // Retry without the zero limit.
-          return translateGqlQuery(gql, datastore, namespace);
+          return translateGqlQuery(gql, datastore, namespace, readTime);
         } else {
           throw e;
         }
@@ -520,10 +559,11 @@ public class DatastoreV1 {
     }
 
     /** Translates a gql query string to {@link Query}. */
-    private static Query translateGqlQuery(String gql, Datastore datastore, String namespace)
+    private static Query translateGqlQuery(
+        String gql, Datastore datastore, String namespace, @Nullable Instant readTime)
         throws DatastoreException {
       GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
-      RunQueryRequest req = makeRequest(gqlQuery, namespace);
+      RunQueryRequest req = makeRequest(gqlQuery, namespace, readTime);
       return datastore.runQuery(req).getQuery();
     }
 
@@ -628,6 +668,11 @@ public class DatastoreV1 {
       return toBuilder().setLocalhost(localhost).build();
     }
 
+    /** Returns a new {@link DatastoreV1.Read} that reads at the specified {@code readTime}. */
+    public DatastoreV1.Read withReadTime(Instant readTime) {
+      return toBuilder().setReadTime(readTime).build();
+    }
+
     /** Returns Number of entities available for reading. */
     public long getNumEntities(
         PipelineOptions options, String ourKind, @Nullable String namespace) {
@@ -638,7 +683,7 @@ public class DatastoreV1 {
             datastoreFactory.getDatastore(
                 options, v1Options.getProjectId(), v1Options.getLocalhost());
 
-        Entity entity = getLatestTableStats(ourKind, namespace, datastore);
+        Entity entity = getLatestTableStats(ourKind, namespace, datastore, getReadTime());
         return entity.getProperties().get("count").getIntegerValue();
       } catch (Exception e) {
         return -1;
@@ -688,13 +733,13 @@ public class DatastoreV1 {
         inputQuery =
             input
                 .apply(Create.ofProvider(getLiteralGqlQuery(), StringUtf8Coder.of()))
-                .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
+                .apply(ParDo.of(new GqlQueryTranslateFn(v1Options, getReadTime())));
       }
 
       return inputQuery
-          .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits())))
+          .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits(), getReadTime())))
           .apply("Reshuffle", Reshuffle.viaRandomKey())
-          .apply("Read", ParDo.of(new ReadFn(v1Options)));
+          .apply("Read", ParDo.of(new ReadFn(v1Options, getReadTime())));
     }
 
     @Override
@@ -705,7 +750,8 @@ public class DatastoreV1 {
           .addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("ProjectId"))
           .addIfNotNull(DisplayData.item("namespace", getNamespace()).withLabel("Namespace"))
           .addIfNotNull(DisplayData.item("query", query).withLabel("Query"))
-          .addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery()).withLabel("GqlQuery"));
+          .addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery()).withLabel("GqlQuery"))
+          .addIfNotNull(DisplayData.item("readTime", getReadTime()).withLabel("ReadTime"));
     }
 
     @VisibleForTesting
@@ -764,15 +810,22 @@ public class DatastoreV1 {
     /** A DoFn that translates a Cloud Datastore gql query string to {@code Query}. */
     static class GqlQueryTranslateFn extends DoFn<String, Query> {
       private final V1Options v1Options;
+      private final @Nullable Instant readTime;
       private transient Datastore datastore;
       private final V1DatastoreFactory datastoreFactory;
 
       GqlQueryTranslateFn(V1Options options) {
-        this(options, new V1DatastoreFactory());
+        this(options, null, new V1DatastoreFactory());
       }
 
-      GqlQueryTranslateFn(V1Options options, V1DatastoreFactory datastoreFactory) {
+      GqlQueryTranslateFn(V1Options options, @Nullable Instant readTime) {
+        this(options, readTime, new V1DatastoreFactory());
+      }
+
+      GqlQueryTranslateFn(
+          V1Options options, @Nullable Instant readTime, V1DatastoreFactory datastoreFactory) {
         this.v1Options = options;
+        this.readTime = readTime;
         this.datastoreFactory = datastoreFactory;
       }
 
@@ -788,7 +841,8 @@ public class DatastoreV1 {
         String gqlQuery = c.element();
         LOG.info("User query: '{}'", gqlQuery);
         Query query =
-            translateGqlQueryWithLimitCheck(gqlQuery, datastore, v1Options.getNamespace());
+            translateGqlQueryWithLimitCheck(
+                gqlQuery, datastore, v1Options.getNamespace(), readTime);
         LOG.info("User gql query translated to Query({})", query);
         c.output(query);
       }
@@ -803,6 +857,8 @@ public class DatastoreV1 {
       private final V1Options options;
       // number of splits to make for a given query
       private final int numSplits;
+      // time from which to run the queries
+      private final @Nullable Instant readTime;
 
       private final V1DatastoreFactory datastoreFactory;
       // Datastore client
@@ -811,14 +867,23 @@ public class DatastoreV1 {
       private transient QuerySplitter querySplitter;
 
       public SplitQueryFn(V1Options options, int numSplits) {
-        this(options, numSplits, new V1DatastoreFactory());
+        this(options, numSplits, null, new V1DatastoreFactory());
+      }
+
+      public SplitQueryFn(V1Options options, int numSplits, @Nullable Instant readTime) {
+        this(options, numSplits, readTime, new V1DatastoreFactory());
       }
 
       @VisibleForTesting
-      SplitQueryFn(V1Options options, int numSplits, V1DatastoreFactory datastoreFactory) {
+      SplitQueryFn(
+          V1Options options,
+          int numSplits,
+          @Nullable Instant readTime,
+          V1DatastoreFactory datastoreFactory) {
         this.options = options;
         this.numSplits = numSplits;
         this.datastoreFactory = datastoreFactory;
+        this.readTime = readTime;
       }
 
       @StartBundle
@@ -842,7 +907,8 @@ public class DatastoreV1 {
         int estimatedNumSplits;
         // Compute the estimated numSplits if numSplits is not specified by the user.
         if (numSplits <= 0) {
-          estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace());
+          estimatedNumSplits =
+              getEstimatedNumSplits(datastore, query, options.getNamespace(), readTime);
         } else {
           estimatedNumSplits = numSplits;
         }
@@ -852,7 +918,12 @@ public class DatastoreV1 {
         try {
           querySplits =
               splitQuery(
-                  query, options.getNamespace(), datastore, querySplitter, estimatedNumSplits);
+                  query,
+                  options.getNamespace(),
+                  datastore,
+                  querySplitter,
+                  estimatedNumSplits,
+                  readTime);
         } catch (Exception e) {
           LOG.warn("Unable to parallelize the given query: {}", query, e);
           querySplits = ImmutableList.of(query);
@@ -873,6 +944,7 @@ public class DatastoreV1 {
               DisplayData.item("numQuerySplits", numSplits)
                   .withLabel("Requested number of Query splits"));
         }
+        builder.addIfNotNull(DisplayData.item("readTime", readTime).withLabel("ReadTime"));
       }
     }
 
@@ -880,6 +952,7 @@ public class DatastoreV1 {
     @VisibleForTesting
     static class ReadFn extends DoFn<Query, Entity> {
       private final V1Options options;
+      private final @Nullable Instant readTime;
       private final V1DatastoreFactory datastoreFactory;
       // Datastore client
       private transient Datastore datastore;
@@ -894,12 +967,17 @@ public class DatastoreV1 {
               .withInitialBackoff(Duration.standardSeconds(5));
 
       public ReadFn(V1Options options) {
-        this(options, new V1DatastoreFactory());
+        this(options, null, new V1DatastoreFactory());
+      }
+
+      public ReadFn(V1Options options, @Nullable Instant readTime) {
+        this(options, readTime, new V1DatastoreFactory());
       }
 
       @VisibleForTesting
-      ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) {
+      ReadFn(V1Options options, @Nullable Instant readTime, V1DatastoreFactory datastoreFactory) {
         this.options = options;
+        this.readTime = readTime;
         this.datastoreFactory = datastoreFactory;
       }
 
@@ -967,7 +1045,7 @@ public class DatastoreV1 {
             queryBuilder.setStartCursor(currentBatch.getEndCursor());
           }
 
-          RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+          RunQueryRequest request = makeRequest(queryBuilder.build(), namespace, readTime);
           RunQueryResponse response = runQueryWithRetries(request);
 
           currentBatch = response.getBatch();
@@ -1005,6 +1083,7 @@ public class DatastoreV1 {
       public void populateDisplayData(DisplayData.Builder builder) {
         super.populateDisplayData(builder);
         builder.include("options", options);
+        builder.addIfNotNull(DisplayData.item("readTime", readTime).withLabel("ReadTime"));
       }
     }
   }
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 0fc895b0d9e..4aed59c4da3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -67,8 +67,12 @@ import com.google.datastore.v1.client.Datastore;
 import com.google.datastore.v1.client.DatastoreException;
 import com.google.datastore.v1.client.QuerySplitter;
 import com.google.protobuf.Int32Value;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.util.Timestamps;
 import com.google.rpc.Code;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
@@ -101,17 +105,23 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.mockito.ArgumentCaptor;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
 /** Tests for {@link DatastoreV1}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
 public class DatastoreV1Test {
   private static final String PROJECT_ID = "testProject";
   private static final String NAMESPACE = "testNamespace";
@@ -119,6 +129,8 @@ public class DatastoreV1Test {
   private static final Query QUERY;
   private static final String LOCALHOST = "localhost:9955";
   private static final String GQL_QUERY = "SELECT * from " + KIND;
+  private static final Instant TIMESTAMP = Instant.now();
+  private static final Timestamp TIMESTAMP_PROTO = Timestamps.fromMillis(TIMESTAMP.getMillis());
   private static final V1Options V_1_OPTIONS;
 
   static {
@@ -128,9 +140,9 @@ public class DatastoreV1Test {
     V_1_OPTIONS = V1Options.from(PROJECT_ID, NAMESPACE, null);
   }
 
-  @Mock private Datastore mockDatastore;
-  @Mock QuerySplitter mockQuerySplitter;
-  @Mock V1DatastoreFactory mockDatastoreFactory;
+  @Mock protected Datastore mockDatastore;
+  @Mock protected QuerySplitter mockQuerySplitter;
+  @Mock protected V1DatastoreFactory mockDatastoreFactory;
 
   @Rule public final ExpectedException thrown = ExpectedException.none();
 
@@ -146,782 +158,890 @@ public class DatastoreV1Test {
     MetricsEnvironment.setProcessWideContainer(container);
   }
 
-  @Test
-  public void testBuildRead() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  @Test
-  public void testBuildReadWithGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withNamespace(NAMESPACE);
-    assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  /** {@link #testBuildRead} but constructed in a different order. */
-  @Test
-  public void testBuildReadAlt() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withQuery(QUERY)
-            .withNamespace(NAMESPACE)
-            .withProjectId(PROJECT_ID)
-            .withLocalhost(LOCALHOST);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-    assertEquals(LOCALHOST, read.getLocalhost());
-  }
-
-  @Test
-  public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withQuery(QUERY);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
-    read.expand(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+  @RunWith(JUnit4.class)
+  public static class SingletonTests extends DatastoreV1Test {
+    @Test
+    public void testBuildRead() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withNamespace(NAMESPACE);
+      assertEquals(QUERY, read.getQuery());
+      assertEquals(PROJECT_ID, read.getProjectId().get());
+      assertEquals(NAMESPACE, read.getNamespace().get());
+    }
 
-    DisplayData displayData = DisplayData.from(read);
+    @Test
+    public void testBuildReadWithReadTime() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withReadTime(TIMESTAMP);
+      assertEquals(TIMESTAMP, read.getReadTime());
+      assertEquals(QUERY, read.getQuery());
+      assertEquals(PROJECT_ID, read.getProjectId().get());
+    }
 
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-    assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
-    assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
-  }
+    @Test
+    public void testBuildReadWithGqlQuery() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withLiteralGqlQuery(GQL_QUERY)
+              .withNamespace(NAMESPACE);
+      assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
+      assertEquals(PROJECT_ID, read.getProjectId().get());
+      assertEquals(NAMESPACE, read.getNamespace().get());
+    }
 
-  @Test
-  public void testReadDisplayDataWithGqlQuery() {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withNamespace(NAMESPACE);
+    /** {@link #testBuildRead} but constructed in a different order. */
+    @Test
+    public void testBuildReadAlt() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withReadTime(TIMESTAMP)
+              .withQuery(QUERY)
+              .withNamespace(NAMESPACE)
+              .withProjectId(PROJECT_ID)
+              .withLocalhost(LOCALHOST);
+      assertEquals(TIMESTAMP, read.getReadTime());
+      assertEquals(QUERY, read.getQuery());
+      assertEquals(PROJECT_ID, read.getProjectId().get());
+      assertEquals(NAMESPACE, read.getNamespace().get());
+      assertEquals(LOCALHOST, read.getLocalhost());
+    }
 
-    DisplayData displayData = DisplayData.from(read);
+    @Test
+    public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withLiteralGqlQuery(GQL_QUERY)
+              .withQuery(QUERY);
+
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
+      read.expand(null);
+    }
 
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-    assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY));
-    assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
-  }
+    @Test
+    public void testReadValidationFailsQueryLimitZero() throws Exception {
+      Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage("Invalid query limit 0: must be positive");
 
-  @Test
-  public void testSourcePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    int numSplits = 98;
-    PTransform<PBegin, PCollection<Entity>> read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withQuery(Query.newBuilder().build())
-            .withNumQuerySplits(numSplits);
-
-    String assertMessage = "DatastoreIO read should include the '%s' in its primitive display data";
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat(
-        String.format(assertMessage, "project id"),
-        displayData,
-        hasItem(hasDisplayItem("projectId", PROJECT_ID)));
-    assertThat(
-        String.format(assertMessage, "number of query splits"),
-        displayData,
-        hasItem(hasDisplayItem("numQuerySplits", numSplits)));
-  }
+      DatastoreIO.v1().read().withQuery(invalidLimit);
+    }
 
-  @Test
-  public void testWriteDisplayData() {
-    Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+    @Test
+    public void testReadValidationFailsQueryLimitNegative() throws Exception {
+      Query invalidLimit =
+          Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage("Invalid query limit -5: must be positive");
 
-    DisplayData displayData = DisplayData.from(write);
+      DatastoreIO.v1().read().withQuery(invalidLimit);
+    }
 
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
+    @Test
+    public void testReadDisplayData() {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withNamespace(NAMESPACE)
+              .withReadTime(TIMESTAMP);
+
+      DisplayData displayData = DisplayData.from(read);
+
+      assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+      assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
+      assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+      assertThat(displayData, hasDisplayItem("readTime", TIMESTAMP));
+    }
 
-  @Test
-  public void testDeleteEntityDisplayData() {
-    DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
+    @Test
+    public void testReadDisplayDataWithGqlQuery() {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withLiteralGqlQuery(GQL_QUERY)
+              .withNamespace(NAMESPACE)
+              .withReadTime(TIMESTAMP);
+
+      DisplayData displayData = DisplayData.from(read);
+
+      assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+      assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY));
+      assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+      assertThat(displayData, hasDisplayItem("readTime", TIMESTAMP));
+    }
 
-    DisplayData displayData = DisplayData.from(deleteEntity);
+    @Test
+    public void testSourcePrimitiveDisplayData() {
+      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+      int numSplits = 98;
+      PTransform<PBegin, PCollection<Entity>> read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(Query.newBuilder().build())
+              .withNumQuerySplits(numSplits);
+
+      String assertMessage =
+          "DatastoreIO read should include the '%s' in its primitive display data";
+      Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+      assertThat(
+          String.format(assertMessage, "project id"),
+          displayData,
+          hasItem(hasDisplayItem("projectId", PROJECT_ID)));
+      assertThat(
+          String.format(assertMessage, "number of query splits"),
+          displayData,
+          hasItem(hasDisplayItem("numQuerySplits", numSplits)));
+    }
 
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
+    @Test
+    public void testWriteDisplayData() {
+      Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
 
-  @Test
-  public void testDeleteKeyDisplayData() {
-    DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
+      DisplayData displayData = DisplayData.from(write);
 
-    DisplayData displayData = DisplayData.from(deleteKey);
+      assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+    }
 
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
+    @Test
+    public void testDeleteEntityDisplayData() {
+      DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
 
-  @Test
-  public void testWritePrimitiveDisplayData() {
-    int hintNumWorkers = 10;
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1().write().withProjectId("myProject").withHintNumWorkers(hintNumWorkers);
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat(
-        "DatastoreIO write should include the project in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("projectId")));
-    assertThat(
-        "DatastoreIO write should include the upsertFn in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("upsertFn")));
-    assertThat(
-        "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
-        displayData,
-        hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
-  }
+      DisplayData displayData = DisplayData.from(deleteEntity);
 
-  @Test
-  public void testWritePrimitiveDisplayDataDisabledThrottler() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1().write().withProjectId("myProject").withRampupThrottlingDisabled();
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat(
-        "DatastoreIO write should include the project in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("projectId")));
-    assertThat(
-        "DatastoreIO write should include the upsertFn in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("upsertFn")));
-    assertThat(
-        "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
-        displayData,
-        not(hasItem(hasDisplayItem("hintNumWorkers"))));
-  }
+      assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+    }
 
-  @Test
-  public void testDeleteEntityPrimitiveDisplayData() {
-    int hintNumWorkers = 10;
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1()
-            .deleteEntity()
-            .withProjectId("myProject")
-            .withHintNumWorkers(hintNumWorkers);
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat(
-        "DatastoreIO write should include the project in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("projectId")));
-    assertThat(
-        "DatastoreIO write should include the deleteEntityFn in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("deleteEntityFn")));
-    assertThat(
-        "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
-        displayData,
-        hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
-  }
+    @Test
+    public void testDeleteKeyDisplayData() {
+      DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
 
-  @Test
-  public void testDeleteKeyPrimitiveDisplayData() {
-    int hintNumWorkers = 10;
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Key>, ?> write =
-        DatastoreIO.v1().deleteKey().withProjectId("myProject").withHintNumWorkers(hintNumWorkers);
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat(
-        "DatastoreIO write should include the project in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("projectId")));
-    assertThat(
-        "DatastoreIO write should include the deleteKeyFn in its primitive display data",
-        displayData,
-        hasItem(hasDisplayItem("deleteKeyFn")));
-    assertThat(
-        "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
-        displayData,
-        hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
-  }
+      DisplayData displayData = DisplayData.from(deleteKey);
 
-  /** Test building a Write using builder methods. */
-  @Test
-  public void testBuildWrite() throws Exception {
-    DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
-    assertEquals(PROJECT_ID, write.getProjectId());
-  }
+      assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+    }
 
-  /** Test the detection of complete and incomplete keys. */
-  @Test
-  public void testHasNameOrId() {
-    Key key;
-    // Complete with name, no ancestor
-    key = makeKey("bird", "finch").build();
-    assertTrue(isValidKey(key));
-
-    // Complete with id, no ancestor
-    key = makeKey("bird", 123).build();
-    assertTrue(isValidKey(key));
-
-    // Incomplete, no ancestor
-    key = makeKey("bird").build();
-    assertFalse(isValidKey(key));
-
-    // Complete with name and ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird", "horned").build();
-    assertTrue(isValidKey(key));
-
-    // Complete with id and ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird", 123).build();
-    assertTrue(isValidKey(key));
-
-    // Incomplete with ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird").build();
-    assertFalse(isValidKey(key));
-
-    key = makeKey().build();
-    assertFalse(isValidKey(key));
-  }
+    @Test
+    public void testWritePrimitiveDisplayData() {
+      int hintNumWorkers = 10;
+      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+      PTransform<PCollection<Entity>, ?> write =
+          DatastoreIO.v1().write().withProjectId("myProject").withHintNumWorkers(hintNumWorkers);
+
+      Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+      assertThat(
+          "DatastoreIO write should include the project in its primitive display data",
+          displayData,
+          hasItem(hasDisplayItem("projectId")));
+      assertThat(
+          "DatastoreIO write should include the upsertFn in its primitive display data",
+          displayData,
+          hasItem(hasDisplayItem("upsertFn")));
+      assertThat(
+          "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
+          displayData,
+          hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
+    }
 
-  /** Test that entities with incomplete keys cannot be updated. */
-  @Test
-  public void testAddEntitiesWithIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    UpsertFn upsertFn = new UpsertFn();
+    @Test
+    public void testWritePrimitiveDisplayDataDisabledThrottler() {
+      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+      PTransform<PCollection<Entity>, ?> write =
+          DatastoreIO.v1().write().withProjectId("myProject").withRampupThrottlingDisabled();
+
+      Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+      assertThat(
+          "DatastoreIO write should include the project in its primitive display data",
+          displayData,
+          hasItem(hasDisplayItem("projectId")));
+      assertThat(
+          "DatastoreIO write should include the upsertFn in its primitive display data",
+          displayData,
+          hasItem(hasDisplayItem("upsertFn")));
+      assertThat(
+          "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
+          displayData,
+          not(hasItem(hasDisplayItem("hintNumWorkers"))));
+    }
 
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Entities to be written to the Cloud Datastore must have complete keys");
+    @Test
+    public void testDeleteEntityPrimitiveDisplayData() {
+      int hintNumWorkers = 10;
+      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+      PTransform<PCollection<Entity>, ?> write =
+          DatastoreIO.v1()
+              .deleteEntity()
+              .withProjectId("myProject")
+              .withHintNumWorkers(hintNumWorkers);
+
+      Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+      assertThat(
+          "DatastoreIO write should include the project in its primitive display data",
+          displayData,
+          hasItem(hasDisplayItem("projectId")));
+      assertThat(
+          "DatastoreIO write should include the deleteEntityFn in its primitive display data",
+          displayData,
+          hasItem(hasDisplayItem("deleteEntityFn")));
+      assertThat(
+          "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
+          displayData,
+          hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
+    }
 
-    upsertFn.apply(entity);
-  }
+    @Test
+    public void testDeleteKeyPrimitiveDisplayData() {
+      int hintNumWorkers = 10;
+      DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+      PTransform<PCollection<Key>, ?> write =
+          DatastoreIO.v1()
+              .deleteKey()
+              .withProjectId("myProject")
+              .withHintNumWorkers(hintNumWorkers);
+
+      Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+      assertThat(
+          "DatastoreIO write should include the project in its primitive display data",
+          displayData,
+          hasItem(hasDisplayItem("projectId")));
+      assertThat(
+          "DatastoreIO write should include the deleteKeyFn in its primitive display data",
+          displayData,
+          hasItem(hasDisplayItem("deleteKeyFn")));
+      assertThat(
+          "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
+          displayData,
+          hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
+    }
 
-  @Test
-  /** Test that entities with valid keys are transformed to upsert mutations. */
-  public void testAddEntities() throws Exception {
-    Key key = makeKey("bird", "finch").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    UpsertFn upsertFn = new UpsertFn();
+    /** Test building a Write using builder methods. */
+    @Test
+    public void testBuildWrite() throws Exception {
+      DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+      assertEquals(PROJECT_ID, write.getProjectId());
+    }
 
-    Mutation expectedMutation = makeUpsert(entity).build();
-    assertEquals(expectedMutation, upsertFn.apply(entity));
-  }
+    /** Test the detection of complete and incomplete keys. */
+    @Test
+    public void testHasNameOrId() {
+      Key key;
+      // Complete with name, no ancestor
+      key = makeKey("bird", "finch").build();
+      assertTrue(isValidKey(key));
+
+      // Complete with id, no ancestor
+      key = makeKey("bird", 123).build();
+      assertTrue(isValidKey(key));
+
+      // Incomplete, no ancestor
+      key = makeKey("bird").build();
+      assertFalse(isValidKey(key));
+
+      // Complete with name and ancestor
+      key = makeKey("bird", "owl").build();
+      key = makeKey(key, "bird", "horned").build();
+      assertTrue(isValidKey(key));
+
+      // Complete with id and ancestor
+      key = makeKey("bird", "owl").build();
+      key = makeKey(key, "bird", 123).build();
+      assertTrue(isValidKey(key));
+
+      // Incomplete with ancestor
+      key = makeKey("bird", "owl").build();
+      key = makeKey(key, "bird").build();
+      assertFalse(isValidKey(key));
+
+      key = makeKey().build();
+      assertFalse(isValidKey(key));
+    }
 
-  /** Test that entities with incomplete keys cannot be deleted. */
-  @Test
-  public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+    /** Test that entities with incomplete keys cannot be updated. */
+    @Test
+    public void testAddEntitiesWithIncompleteKeys() throws Exception {
+      Key key = makeKey("bird").build();
+      Entity entity = Entity.newBuilder().setKey(key).build();
+      UpsertFn upsertFn = new UpsertFn();
 
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Entities to be deleted from the Cloud Datastore must have complete keys");
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage("Entities to be written to the Cloud Datastore must have complete keys");
 
-    deleteEntityFn.apply(entity);
-  }
+      upsertFn.apply(entity);
+    }
 
-  /** Test that entities with valid keys are transformed to delete mutations. */
-  @Test
-  public void testDeleteEntities() throws Exception {
-    Key key = makeKey("bird", "finch").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+    @Test
+    /** Test that entities with valid keys are transformed to upsert mutations. */
+    public void testAddEntities() throws Exception {
+      Key key = makeKey("bird", "finch").build();
+      Entity entity = Entity.newBuilder().setKey(key).build();
+      UpsertFn upsertFn = new UpsertFn();
 
-    Mutation expectedMutation = makeDelete(entity.getKey()).build();
-    assertEquals(expectedMutation, deleteEntityFn.apply(entity));
-  }
+      Mutation expectedMutation = makeUpsert(entity).build();
+      assertEquals(expectedMutation, upsertFn.apply(entity));
+    }
 
-  /** Test that incomplete keys cannot be deleted. */
-  @Test
-  public void testDeleteIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+    /** Test that entities with incomplete keys cannot be deleted. */
+    @Test
+    public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
+      Key key = makeKey("bird").build();
+      Entity entity = Entity.newBuilder().setKey(key).build();
+      DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
 
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Keys to be deleted from the Cloud Datastore must be complete");
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage(
+          "Entities to be deleted from the Cloud Datastore must have complete keys");
 
-    deleteKeyFn.apply(key);
-  }
+      deleteEntityFn.apply(entity);
+    }
 
-  /** Test that valid keys are transformed to delete mutations. */
-  @Test
-  public void testDeleteKeys() {
-    Key key = makeKey("bird", "finch").build();
-    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+    /** Test that entities with valid keys are transformed to delete mutations. */
+    @Test
+    public void testDeleteEntities() throws Exception {
+      Key key = makeKey("bird", "finch").build();
+      Entity entity = Entity.newBuilder().setKey(key).build();
+      DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
 
-    Mutation expectedMutation = makeDelete(key).build();
-    assertEquals(expectedMutation, deleteKeyFn.apply(key));
-  }
+      Mutation expectedMutation = makeDelete(entity.getKey()).build();
+      assertEquals(expectedMutation, deleteEntityFn.apply(entity));
+    }
 
-  @Test
-  public void testDatastoreWriteFnDisplayData() {
-    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null);
-    DisplayData displayData = DisplayData.from(datastoreWriter);
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
+    /** Test that incomplete keys cannot be deleted. */
+    @Test
+    public void testDeleteIncompleteKeys() throws Exception {
+      Key key = makeKey("bird").build();
+      DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
 
-  /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
-  @Test
-  public void testDatatoreWriterFnWithOneBatch() throws Exception {
-    datastoreWriterFnTest(100);
-    verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
-  }
+      thrown.expect(IllegalArgumentException.class);
+      thrown.expectMessage("Keys to be deleted from the Cloud Datastore must be complete");
 
-  /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
-  @Test
-  public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
-    datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
-    verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 5);
-  }
+      deleteKeyFn.apply(key);
+    }
 
-  /**
-   * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
-   * write batch size.
-   */
-  @Test
-  public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
-    datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
-    verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
-  }
+    /** Test that valid keys are transformed to delete mutations. */
+    @Test
+    public void testDeleteKeys() {
+      Key key = makeKey("bird", "finch").build();
+      DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
 
-  // A helper method to test DatastoreWriterFn for various batch sizes.
-  private void datastoreWriterFnTest(int numMutations) throws Exception {
-    // Create the requested number of mutations.
-    List<Mutation> mutations = new ArrayList<>(numMutations);
-    for (int i = 0; i < numMutations; ++i) {
-      mutations.add(
-          makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+      Mutation expectedMutation = makeDelete(key).build();
+      assertEquals(expectedMutation, deleteKeyFn.apply(key));
     }
 
-    DatastoreWriterFn datastoreWriter =
-        new DatastoreWriterFn(
-            StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
-    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    doFnTester.processBundle(mutations);
-
-    int start = 0;
-    while (start < numMutations) {
-      int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START);
-      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-      commitRequest.addAllMutations(mutations.subList(start, end));
-      // Verify all the batch requests were made with the expected mutations.
-      verify(mockDatastore, times(1)).commit(commitRequest.build());
-      start = end;
+    @Test
+    public void testDatastoreWriteFnDisplayData() {
+      DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null);
+      DisplayData displayData = DisplayData.from(datastoreWriter);
+      assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
     }
-  }
 
-  /**
-   * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
-   */
-  @Test
-  public void testDatatoreWriterFnWithLargeEntities() throws Exception {
-    List<Mutation> mutations = new ArrayList<>();
-    int entitySize = 0;
-    for (int i = 0; i < 12; ++i) {
-      Entity entity =
-          Entity.newBuilder()
-              .setKey(makeKey("key" + i, i + 1))
-              .putProperties(
-                  "long",
-                  makeValue(new String(new char[900_000])).setExcludeFromIndexes(true).build())
-              .build();
-      entitySize = entity.getSerializedSize(); // Take the size of any one entity.
-      mutations.add(makeUpsert(entity).build());
+    /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
+    @Test
+    public void testDatatoreWriterFnWithOneBatch() throws Exception {
+      datastoreWriterFnTest(100);
+      verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
     }
 
-    DatastoreWriterFn datastoreWriter =
-        new DatastoreWriterFn(
-            StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
-    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    doFnTester.processBundle(mutations);
-
-    // This test is over-specific currently; it requires that we split the 12 entity writes into 3
-    // requests, but we only need each CommitRequest to be less than 10MB in size.
-    int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize;
-    int start = 0;
-    while (start < mutations.size()) {
-      int end = Math.min(mutations.size(), start + entitiesPerRpc);
-      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-      commitRequest.addAllMutations(mutations.subList(start, end));
-      // Verify all the batch requests were made with the expected mutations.
-      verify(mockDatastore).commit(commitRequest.build());
-      start = end;
+    /**
+     * Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple.
+     */
+    @Test
+    public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
+      datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
+      verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 5);
     }
-  }
 
-  /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */
-  @Test
-  public void testDatatoreWriterFnRetriesErrors() throws Exception {
-    List<Mutation> mutations = new ArrayList<>();
-    int numRpcs = 2;
-    for (int i = 0; i < DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) {
-      mutations.add(
-          makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+    /**
+     * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
+     * write batch size.
+     */
+    @Test
+    public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
+      datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
+      verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
     }
 
-    CommitResponse successfulCommit = CommitResponse.getDefaultInstance();
-    when(mockDatastore.commit(any(CommitRequest.class)))
-        .thenReturn(successfulCommit)
-        .thenThrow(new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null))
-        .thenReturn(successfulCommit);
-
-    DatastoreWriterFn datastoreWriter =
-        new DatastoreWriterFn(
-            StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
-    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    doFnTester.processBundle(mutations);
-    verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
-    verifyMetricWasSet("BatchDatastoreWrite", "unknown", "", 1);
-  }
-
-  /**
-   * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
-   * query.
-   */
-  @Test
-  public void testEstimatedSizeBytes() throws Exception {
-    long entityBytes = 100L;
-    // In seconds
-    long timestamp = 1234L;
-
-    RunQueryRequest latestTimestampRequest =
-        makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE);
-    RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
-    // Per Kind statistics request and response
-    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
-    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
-    when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
-    when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
-
-    assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
-    verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
-    verify(mockDatastore, times(1)).runQuery(statRequest);
-  }
-
-  /** Tests {@link SplitQueryFn} when number of query splits is specified. */
-  @Test
-  public void testSplitQueryFnWithNumSplits() throws Exception {
-    int numSplits = 100;
-    when(mockQuerySplitter.getSplits(
-            eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
-        .thenReturn(splitQuery(QUERY, numSplits));
+    // A helper method to test DatastoreWriterFn for various batch sizes.
+    private void datastoreWriterFnTest(int numMutations) throws Exception {
+      // Create the requested number of mutations.
+      List<Mutation> mutations = new ArrayList<>(numMutations);
+      for (int i = 0; i < numMutations; ++i) {
+        mutations.add(
+            makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+      }
+
+      DatastoreWriterFn datastoreWriter =
+          new DatastoreWriterFn(
+              StaticValueProvider.of(PROJECT_ID),
+              null,
+              mockDatastoreFactory,
+              new FakeWriteBatcher());
+      DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+      doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+      doFnTester.processBundle(mutations);
+
+      int start = 0;
+      while (start < numMutations) {
+        int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START);
+        CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+        commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+        commitRequest.addAllMutations(mutations.subList(start, end));
+        // Verify all the batch requests were made with the expected mutations.
+        verify(mockDatastore, times(1)).commit(commitRequest.build());
+        start = end;
+      }
+    }
 
-    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
-    DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
     /**
-     * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
-     * mock factory using a when clause for unit testing purposes, it is not serializable because it
-     * doesn't have a no-arg constructor. Thus disabling the cloning to prevent the doFn from being
-     * serialized.
+     * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
      */
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<Query> queries = doFnTester.processBundle(QUERY);
-
-    assertEquals(queries.size(), numSplits);
+    @Test
+    public void testDatatoreWriterFnWithLargeEntities() throws Exception {
+      List<Mutation> mutations = new ArrayList<>();
+      int entitySize = 0;
+      for (int i = 0; i < 12; ++i) {
+        Entity entity =
+            Entity.newBuilder()
+                .setKey(makeKey("key" + i, i + 1))
+                .putProperties(
+                    "long",
+                    makeValue(new String(new char[900_000])).setExcludeFromIndexes(true).build())
+                .build();
+        entitySize = entity.getSerializedSize(); // Take the size of any one entity.
+        mutations.add(makeUpsert(entity).build());
+      }
+
+      DatastoreWriterFn datastoreWriter =
+          new DatastoreWriterFn(
+              StaticValueProvider.of(PROJECT_ID),
+              null,
+              mockDatastoreFactory,
+              new FakeWriteBatcher());
+      DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+      doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+      doFnTester.processBundle(mutations);
+
+      // This test is over-specific currently; it requires that we split the 12 entity writes into 3
+      // requests, but we only need each CommitRequest to be less than 10MB in size.
+      int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize;
+      int start = 0;
+      while (start < mutations.size()) {
+        int end = Math.min(mutations.size(), start + entitiesPerRpc);
+        CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+        commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+        commitRequest.addAllMutations(mutations.subList(start, end));
+        // Verify all the batch requests were made with the expected mutations.
+        verify(mockDatastore).commit(commitRequest.build());
+        start = end;
+      }
+    }
 
-    // Confirms that sub-queries are not equal to original when there is more than one split.
-    for (Query subQuery : queries) {
-      assertNotEquals(subQuery, QUERY);
+    /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */
+    @Test
+    public void testDatatoreWriterFnRetriesErrors() throws Exception {
+      List<Mutation> mutations = new ArrayList<>();
+      int numRpcs = 2;
+      for (int i = 0; i < DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) {
+        mutations.add(
+            makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+      }
+
+      CommitResponse successfulCommit = CommitResponse.getDefaultInstance();
+      when(mockDatastore.commit(any(CommitRequest.class)))
+          .thenReturn(successfulCommit)
+          .thenThrow(new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null))
+          .thenReturn(successfulCommit);
+
+      DatastoreWriterFn datastoreWriter =
+          new DatastoreWriterFn(
+              StaticValueProvider.of(PROJECT_ID),
+              null,
+              mockDatastoreFactory,
+              new FakeWriteBatcher());
+      DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+      doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+      doFnTester.processBundle(mutations);
+      verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
+      verifyMetricWasSet("BatchDatastoreWrite", "unknown", "", 1);
     }
 
-    verify(mockQuerySplitter, times(1))
-        .getSplits(eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
-    verifyZeroInteractions(mockDatastore);
-  }
+    /** Test options. * */
+    public interface RuntimeTestOptions extends PipelineOptions {
+      ValueProvider<String> getDatastoreProject();
 
-  /** Tests {@link SplitQueryFn} when no query splits is specified. */
-  @Test
-  public void testSplitQueryFnWithoutNumSplits() throws Exception {
-    // Force SplitQueryFn to compute the number of query splits
-    int numSplits = 0;
-    int expectedNumSplits = 20;
-    long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
-    // In seconds
-    long timestamp = 1234L;
-
-    RunQueryRequest latestTimestampRequest =
-        makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE);
-    RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
-
-    // Per Kind statistics request and response
-    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
-    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
-    when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
-    when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
-    when(mockQuerySplitter.getSplits(
-            eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
-        .thenReturn(splitQuery(QUERY, expectedNumSplits));
-
-    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
-    DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<Query> queries = doFnTester.processBundle(QUERY);
+      void setDatastoreProject(ValueProvider<String> value);
 
-    assertEquals(expectedNumSplits, queries.size());
-    verify(mockQuerySplitter, times(1))
-        .getSplits(eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
-    verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
-    verify(mockDatastore, times(1)).runQuery(statRequest);
-  }
+      ValueProvider<String> getGqlQuery();
 
-  /** Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit. */
-  @Test
-  public void testSplitQueryFnWithQueryLimit() throws Exception {
-    Query queryWithLimit = QUERY.toBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
+      void setGqlQuery(ValueProvider<String> value);
 
-    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory);
-    DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<Query> queries = doFnTester.processBundle(queryWithLimit);
+      ValueProvider<String> getNamespace();
 
-    assertEquals(1, queries.size());
-    verifyNoMoreInteractions(mockDatastore);
-    verifyNoMoreInteractions(mockQuerySplitter);
-  }
+      void setNamespace(ValueProvider<String> value);
+    }
 
-  /** Tests {@link ReadFn} with a query limit less than one batch. */
-  @Test
-  public void testReadFnWithOneBatch() throws Exception {
-    readFnTest(5);
-    verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1);
-  }
+    /**
+     * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction
+     * time when built with {@link DatastoreV1.Read#withQuery(Query)}.
+     */
+    @Test
+    public void testRuntimeOptionsNotCalledInApplyQuery() {
+      RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+      Pipeline pipeline = TestPipeline.create(options);
+      pipeline
+          .apply(
+              DatastoreIO.v1()
+                  .read()
+                  .withProjectId(options.getDatastoreProject())
+                  .withQuery(QUERY)
+                  .withNamespace(options.getNamespace()))
+          .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
+    }
 
-  /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
-  @Test
-  public void testReadFnWithMultipleBatches() throws Exception {
-    readFnTest(QUERY_BATCH_LIMIT + 5);
-    verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 2);
-  }
+    /**
+     * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction
+     * time when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}.
+     */
+    @Test
+    public void testRuntimeOptionsNotCalledInApplyGqlQuery() {
+      RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+      Pipeline pipeline = TestPipeline.create(options);
+      pipeline
+          .apply(
+              DatastoreIO.v1()
+                  .read()
+                  .withProjectId(options.getDatastoreProject())
+                  .withLiteralGqlQuery(options.getGqlQuery()))
+          .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
+    }
 
-  /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
-  @Test
-  public void testReadFnWithBatchesExactMultiple() throws Exception {
-    readFnTest(5 * QUERY_BATCH_LIMIT);
-    verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 5);
-  }
+    @Test
+    public void testWriteBatcherWithoutData() {
+      DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+      writeBatcher.start();
+      assertEquals(
+          DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0));
+    }
 
-  /** Tests that {@link ReadFn} retries after an error. */
-  @Test
-  public void testReadFnRetriesErrors() throws Exception {
-    // An empty query to read entities.
-    Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
+    @Test
+    public void testWriteBatcherFastQueries() {
+      DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+      writeBatcher.start();
+      writeBatcher.addRequestLatency(0, 1000, 200);
+      writeBatcher.addRequestLatency(0, 1000, 200);
+      assertEquals(
+          DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0));
+    }
 
-    // Use mockResponseForQuery to generate results.
-    when(mockDatastore.runQuery(any(RunQueryRequest.class)))
-        .thenThrow(new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null))
-        .thenAnswer(
-            invocationOnMock -> {
-              Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
-              return mockResponseForQuery(q);
-            });
+    @Test
+    public void testWriteBatcherSlowQueries() {
+      DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+      writeBatcher.start();
+      writeBatcher.addRequestLatency(0, 10000, 200);
+      writeBatcher.addRequestLatency(0, 10000, 200);
+      assertEquals(120, writeBatcher.nextBatchSize(0));
+    }
 
-    ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
-    DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    doFnTester.processBundle(query);
-    verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1);
-    verifyMetricWasSet("BatchDatastoreRead", "unknown", NAMESPACE, 1);
-  }
+    @Test
+    public void testWriteBatcherSizeNotBelowMinimum() {
+      DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+      writeBatcher.start();
+      writeBatcher.addRequestLatency(0, 75000, 50);
+      writeBatcher.addRequestLatency(0, 75000, 50);
+      assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0));
+    }
 
-  @Test
-  public void testTranslateGqlQueryWithLimit() throws Exception {
-    String gql = "SELECT * from DummyKind LIMIT 10";
-    String gqlWithZeroLimit = gql + " LIMIT 0";
-    GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
-    GqlQuery gqlQueryWithZeroLimit =
-        GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
-    RunQueryRequest gqlRequest = makeRequest(gqlQuery, V_1_OPTIONS.getNamespace());
-    RunQueryRequest gqlRequestWithZeroLimit =
-        makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace());
-    when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
-        .thenThrow(
-            new DatastoreException(
-                "runQuery",
-                Code.INVALID_ARGUMENT,
-                "invalid query",
-                // dummy
-                new RuntimeException()));
-    when(mockDatastore.runQuery(gqlRequest))
-        .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
-    assertEquals(
-        translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), QUERY);
-    verify(mockDatastore, times(1)).runQuery(gqlRequest);
-    verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
+    @Test
+    public void testWriteBatcherSlidingWindow() {
+      DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+      writeBatcher.start();
+      writeBatcher.addRequestLatency(0, 30000, 50);
+      writeBatcher.addRequestLatency(50000, 8000, 200);
+      writeBatcher.addRequestLatency(100000, 8000, 200);
+      assertEquals(150, writeBatcher.nextBatchSize(150000));
+    }
   }
 
-  @Test
-  public void testTranslateGqlQueryWithNoLimit() throws Exception {
-    String gql = "SELECT * from DummyKind";
-    String gqlWithZeroLimit = gql + " LIMIT 0";
-    GqlQuery gqlQueryWithZeroLimit =
-        GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
-    RunQueryRequest gqlRequestWithZeroLimit =
-        makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace());
-    when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
-        .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
-    assertEquals(
-        translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), QUERY);
-    verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
-  }
+  @RunWith(Parameterized.class)
+  public static class ParameterizedTests extends DatastoreV1Test {
+    @Parameter(0)
+    public Instant readTime;
 
-  @Test
-  public void testTranslateGqlQueryWithException() throws Exception {
-    String gql = "SELECT * from DummyKind";
-    String gqlWithZeroLimit = gql + " LIMIT 0";
-    GqlQuery gqlQueryWithZeroLimit =
-        GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
-    RunQueryRequest gqlRequestWithZeroLimit =
-        makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace());
-    when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
-        .thenThrow(new RuntimeException("TestException"));
-
-    thrown.expect(RuntimeException.class);
-    thrown.expectMessage("TestException");
-    translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace());
-  }
+    @Parameter(1)
+    public Timestamp readTimeProto;
 
-  /** Test options. * */
-  public interface RuntimeTestOptions extends PipelineOptions {
-    ValueProvider<String> getDatastoreProject();
+    @Parameters(name = "readTime = {0}, readTimeProto = {1}")
+    public static Collection<Object[]> data() {
+      return Arrays.asList(new Object[] {null, null}, new Object[] {TIMESTAMP, TIMESTAMP_PROTO});
+    }
 
-    void setDatastoreProject(ValueProvider<String> value);
+    /**
+     * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
+     * query.
+     */
+    @Test
+    public void testEstimatedSizeBytes() throws Exception {
+      long entityBytes = 100L;
+      // In seconds
+      long timestamp = 1234L;
+
+      RunQueryRequest latestTimestampRequest =
+          makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE, readTime);
+      RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
+      // Per Kind statistics request and response
+      RunQueryRequest statRequest =
+          makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE, readTime);
+      RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+      when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
+      when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
+
+      assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE, readTime));
+      verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
+      verify(mockDatastore, times(1)).runQuery(statRequest);
+    }
 
-    ValueProvider<String> getGqlQuery();
+    /** Tests {@link SplitQueryFn} when number of query splits is specified. */
+    @Test
+    public void testSplitQueryFnWithNumSplits() throws Exception {
+      int numSplits = 100;
+
+      when(mockQuerySplitter.getSplits(
+              eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
+          .thenReturn(splitQuery(QUERY, numSplits));
+      when(mockQuerySplitter.getSplits(
+              eq(QUERY),
+              any(PartitionId.class),
+              eq(numSplits),
+              any(Datastore.class),
+              eq(readTimeProto)))
+          .thenReturn(splitQuery(QUERY, numSplits));
+
+      SplitQueryFn splitQueryFn =
+          new SplitQueryFn(V_1_OPTIONS, numSplits, readTime, mockDatastoreFactory);
+      DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
+      /**
+       * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected
+       * through mock factory using a when clause for unit testing purposes, it is not serializable
+       * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
+       * doFn from being serialized.
+       */
+      doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+      List<Query> queries = doFnTester.processBundle(QUERY);
+
+      assertEquals(queries.size(), numSplits);
+
+      // Confirms that sub-queries are not equal to original when there is more than one split.
+      for (Query subQuery : queries) {
+        assertNotEquals(subQuery, QUERY);
+      }
+
+      if (readTime == null) {
+        verify(mockQuerySplitter, times(1))
+            .getSplits(eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
+      } else {
+        verify(mockQuerySplitter, times(1))
+            .getSplits(
+                eq(QUERY),
+                any(PartitionId.class),
+                eq(numSplits),
+                any(Datastore.class),
+                eq(readTimeProto));
+      }
+      verifyZeroInteractions(mockDatastore);
+    }
 
-    void setGqlQuery(ValueProvider<String> value);
+    /** Tests {@link SplitQueryFn} when no query splits is specified. */
+    @Test
+    public void testSplitQueryFnWithoutNumSplits() throws Exception {
+      // Force SplitQueryFn to compute the number of query splits
+      int numSplits = 0;
+      int expectedNumSplits = 20;
+      long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
+      // In seconds
+      long timestamp = 1234L;
+
+      RunQueryRequest latestTimestampRequest =
+          makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE, readTime);
+      RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
+
+      // Per Kind statistics request and response
+      RunQueryRequest statRequest =
+          makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE, readTime);
+      RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+      when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
+      when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
+      when(mockQuerySplitter.getSplits(
+              eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
+          .thenReturn(splitQuery(QUERY, expectedNumSplits));
+      when(mockQuerySplitter.getSplits(
+              eq(QUERY),
+              any(PartitionId.class),
+              eq(expectedNumSplits),
+              any(Datastore.class),
+              eq(readTimeProto)))
+          .thenReturn(splitQuery(QUERY, expectedNumSplits));
+
+      SplitQueryFn splitQueryFn =
+          new SplitQueryFn(V_1_OPTIONS, numSplits, readTime, mockDatastoreFactory);
+      DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
+      doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+      List<Query> queries = doFnTester.processBundle(QUERY);
+
+      assertEquals(expectedNumSplits, queries.size());
+      if (readTime == null) {
+        verify(mockQuerySplitter, times(1))
+            .getSplits(
+                eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
+      } else {
+        verify(mockQuerySplitter, times(1))
+            .getSplits(
+                eq(QUERY),
+                any(PartitionId.class),
+                eq(expectedNumSplits),
+                any(Datastore.class),
+                eq(readTimeProto));
+      }
+      verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
+      verify(mockDatastore, times(1)).runQuery(statRequest);
+    }
 
-    ValueProvider<String> getNamespace();
+    /** Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit. */
+    @Test
+    public void testSplitQueryFnWithQueryLimit() throws Exception {
+      Query queryWithLimit =
+          QUERY.toBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
 
-    void setNamespace(ValueProvider<String> value);
-  }
+      SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, readTime, mockDatastoreFactory);
+      DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
+      doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+      List<Query> queries = doFnTester.processBundle(queryWithLimit);
 
-  /**
-   * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time
-   * when built with {@link DatastoreV1.Read#withQuery(Query)}.
-   */
-  @Test
-  public void testRuntimeOptionsNotCalledInApplyQuery() {
-    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
-    Pipeline pipeline = TestPipeline.create(options);
-    pipeline
-        .apply(
-            DatastoreIO.v1()
-                .read()
-                .withProjectId(options.getDatastoreProject())
-                .withQuery(QUERY)
-                .withNamespace(options.getNamespace()))
-        .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
-  }
+      assertEquals(1, queries.size());
+      verifyNoMoreInteractions(mockDatastore);
+      verifyNoMoreInteractions(mockQuerySplitter);
+    }
 
-  /**
-   * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time
-   * when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}.
-   */
-  @Test
-  public void testRuntimeOptionsNotCalledInApplyGqlQuery() {
-    RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
-    Pipeline pipeline = TestPipeline.create(options);
-    pipeline
-        .apply(
-            DatastoreIO.v1()
-                .read()
-                .withProjectId(options.getDatastoreProject())
-                .withLiteralGqlQuery(options.getGqlQuery()))
-        .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
-  }
+    /** Tests {@link ReadFn} with a query limit less than one batch. */
+    @Test
+    public void testReadFnWithOneBatch() throws Exception {
+      readFnTest(5, readTime);
+      verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1);
+    }
 
-  @Test
-  public void testWriteBatcherWithoutData() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0));
-  }
+    /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
+    @Test
+    public void testReadFnWithMultipleBatches() throws Exception {
+      readFnTest(QUERY_BATCH_LIMIT + 5, readTime);
+      verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 2);
+    }
 
-  @Test
-  public void testWriteBatcherFastQueries() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    writeBatcher.addRequestLatency(0, 1000, 200);
-    writeBatcher.addRequestLatency(0, 1000, 200);
-    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0));
-  }
+    /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
+    @Test
+    public void testReadFnWithBatchesExactMultiple() throws Exception {
+      readFnTest(5 * QUERY_BATCH_LIMIT, readTime);
+      verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 5);
+    }
 
-  @Test
-  public void testWriteBatcherSlowQueries() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    writeBatcher.addRequestLatency(0, 10000, 200);
-    writeBatcher.addRequestLatency(0, 10000, 200);
-    assertEquals(120, writeBatcher.nextBatchSize(0));
-  }
+    /** Tests that {@link ReadFn} retries after an error. */
+    @Test
+    public void testReadFnRetriesErrors() throws Exception {
+      // An empty query to read entities.
+      Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
+
+      // Use mockResponseForQuery to generate results.
+      when(mockDatastore.runQuery(any(RunQueryRequest.class)))
+          .thenThrow(new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null))
+          .thenAnswer(
+              invocationOnMock -> {
+                Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
+                return mockResponseForQuery(q);
+              });
+
+      ReadFn readFn = new ReadFn(V_1_OPTIONS, readTime, mockDatastoreFactory);
+      DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
+      doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+      doFnTester.processBundle(query);
+      verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1);
+      verifyMetricWasSet("BatchDatastoreRead", "unknown", NAMESPACE, 1);
+    }
 
-  @Test
-  public void testWriteBatcherSizeNotBelowMinimum() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    writeBatcher.addRequestLatency(0, 75000, 50);
-    writeBatcher.addRequestLatency(0, 75000, 50);
-    assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0));
-  }
+    @Test
+    public void testTranslateGqlQueryWithLimit() throws Exception {
+      String gql = "SELECT * from DummyKind LIMIT 10";
+      String gqlWithZeroLimit = gql + " LIMIT 0";
+      GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
+      GqlQuery gqlQueryWithZeroLimit =
+          GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
+
+      RunQueryRequest gqlRequest = makeRequest(gqlQuery, V_1_OPTIONS.getNamespace(), readTime);
+      RunQueryRequest gqlRequestWithZeroLimit =
+          makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime);
+      when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
+          .thenThrow(
+              new DatastoreException(
+                  "runQuery",
+                  Code.INVALID_ARGUMENT,
+                  "invalid query",
+                  // dummy
+                  new RuntimeException()));
+      when(mockDatastore.runQuery(gqlRequest))
+          .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
+      assertEquals(
+          translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime),
+          QUERY);
+      verify(mockDatastore, times(1)).runQuery(gqlRequest);
+      verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
+    }
+
+    @Test
+    public void testTranslateGqlQueryWithNoLimit() throws Exception {
+      String gql = "SELECT * from DummyKind";
+      String gqlWithZeroLimit = gql + " LIMIT 0";
+      GqlQuery gqlQueryWithZeroLimit =
+          GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
+
+      RunQueryRequest gqlRequestWithZeroLimit =
+          makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime);
+      when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
+          .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
+      assertEquals(
+          translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime),
+          QUERY);
+      verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
+    }
 
-  @Test
-  public void testWriteBatcherSlidingWindow() {
-    DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
-    writeBatcher.start();
-    writeBatcher.addRequestLatency(0, 30000, 50);
-    writeBatcher.addRequestLatency(50000, 8000, 200);
-    writeBatcher.addRequestLatency(100000, 8000, 200);
-    assertEquals(150, writeBatcher.nextBatchSize(150000));
+    @Test
+    public void testTranslateGqlQueryWithException() throws Exception {
+      String gql = "SELECT * from DummyKind";
+      String gqlWithZeroLimit = gql + " LIMIT 0";
+      GqlQuery gqlQueryWithZeroLimit =
+          GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
+      RunQueryRequest gqlRequestWithZeroLimit =
+          makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime);
+      when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
+          .thenThrow(new RuntimeException("TestException"));
+
+      thrown.expect(RuntimeException.class);
+      thrown.expectMessage("TestException");
+      translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime);
+    }
   }
 
   /** Helper Methods */
@@ -963,7 +1083,7 @@ public class DatastoreV1Test {
   }
 
   /** Helper function to run a test reading from a {@link ReadFn}. */
-  private void readFnTest(int numEntities) throws Exception {
+  protected void readFnTest(int numEntities, Instant readTime) throws Exception {
     // An empty query to read entities.
     Query query =
         Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(numEntities)).build();
@@ -976,7 +1096,7 @@ public class DatastoreV1Test {
               return mockResponseForQuery(q);
             });
 
-    ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
+    ReadFn readFn = new ReadFn(V_1_OPTIONS, readTime, mockDatastoreFactory);
     DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
     /**
      * Although Datastore client is marked transient in {@link ReadFn}, when injected through mock
@@ -987,10 +1107,20 @@ public class DatastoreV1Test {
     doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
     List<Entity> entities = doFnTester.processBundle(query);
 
+    ArgumentCaptor<RunQueryRequest> requestCaptor = ArgumentCaptor.forClass(RunQueryRequest.class);
+
     int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
-    verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
+    verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(requestCaptor.capture());
     // Validate the number of results.
     assertEquals(numEntities, entities.size());
+    // Validate read Time.
+    RunQueryRequest request = requestCaptor.getValue();
+    if (readTime != null) {
+      assertEquals(
+          readTime.getMillis(), Timestamps.toMillis(request.getReadOptions().getReadTime()));
+    } else {
+      assertFalse(request.hasReadOptions());
+    }
   }
 
   /** Builds a per-kind statistics response with the given entity size. */
@@ -1050,7 +1180,7 @@ public class DatastoreV1Test {
   }
 
   /** Generate dummy query splits. */
-  private List<Query> splitQuery(Query query, int numSplits) {
+  private static List<Query> splitQuery(Query query, int numSplits) {
     List<Query> queries = new ArrayList<>();
     int offsetOfOriginal = query.getOffset();
     for (int i = 0; i < numSplits; i++) {
@@ -1082,7 +1212,8 @@ public class DatastoreV1Test {
     }
   }
 
-  private void verifyMetricWasSet(String method, String status, String namespace, long count) {
+  private static void verifyMetricWasSet(
+      String method, String status, String namespace, long count) {
     // Verify the metric as reported.
     HashMap<String, String> labels = new HashMap<>();
     labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
index 6d0bd52f26d..ea00821f360 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -50,6 +52,8 @@ import org.junit.runners.JUnit4;
  */
 @RunWith(JUnit4.class)
 public class SplitQueryFnIT {
+  private Instant readTime = Instant.now().minus(Duration.standardSeconds(10));
+
   /** Tests {@link SplitQueryFn} to generate expected number of splits for a large dataset. */
   @Test
   public void testSplitQueryFnWithLargeDataset() throws Exception {
@@ -59,7 +63,8 @@ public class SplitQueryFnIT {
     // Num splits is computed based on the entity_bytes size of the input_sort_1G kind reported by
     // Datastore stats.
     int expectedNumSplits = 32;
-    testSplitQueryFn(projectId, kind, namespace, expectedNumSplits);
+    testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, null);
+    testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, readTime);
   }
 
   /** Tests {@link SplitQueryFn} to fallback to NUM_QUERY_SPLITS_MIN for a small dataset. */
@@ -69,17 +74,23 @@ public class SplitQueryFnIT {
     String kind = "shakespeare";
     String namespace = null;
     int expectedNumSplits = NUM_QUERY_SPLITS_MIN;
-    testSplitQueryFn(projectId, kind, namespace, expectedNumSplits);
+    testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, null);
+    testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, readTime);
   }
 
   /** A helper method to test {@link SplitQueryFn} to generate the expected number of splits. */
   private void testSplitQueryFn(
-      String projectId, String kind, @Nullable String namespace, int expectedNumSplits)
+      String projectId,
+      String kind,
+      @Nullable String namespace,
+      int expectedNumSplits,
+      @Nullable Instant readTime)
       throws Exception {
     Query.Builder query = Query.newBuilder();
     query.addKindBuilder().setName(kind);
 
-    SplitQueryFn splitQueryFn = new SplitQueryFn(V1Options.from(projectId, namespace, null), 0);
+    SplitQueryFn splitQueryFn =
+        new SplitQueryFn(V1Options.from(projectId, namespace, null), 0, readTime);
     DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
 
     List<Query> queries = doFnTester.processBundle(query.build());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
index 55b53b3f0c9..7d6fc577038 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -48,7 +49,9 @@ public class V1ReadIT {
   private V1TestOptions options;
   private String project;
   private String ancestor;
-  private final long numEntities = 1000;
+  private final long numEntitiesBeforeReadTime = 600;
+  private final long totalNumEntities = 1000;
+  private Instant readTime;
 
   @Before
   public void setup() throws Exception {
@@ -57,7 +60,15 @@ public class V1ReadIT {
     project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
     ancestor = UUID.randomUUID().toString();
     // Create entities and write them to datastore
-    writeEntitiesToDatastore(options, project, ancestor, numEntities);
+    writeEntitiesToDatastore(options, project, ancestor, 0, numEntitiesBeforeReadTime);
+
+    Thread.sleep(1000);
+    readTime = Instant.now();
+    Thread.sleep(1000);
+
+    long moreEntitiesToWrite = totalNumEntities - numEntitiesBeforeReadTime;
+    writeEntitiesToDatastore(
+        options, project, ancestor, numEntitiesBeforeReadTime, moreEntitiesToWrite);
   }
 
   @After
@@ -77,6 +88,7 @@ public class V1ReadIT {
     Query query =
         V1TestUtil.makeAncestorKindQuery(options.getKind(), options.getNamespace(), ancestor);
 
+    // Read entities without readTime.
     DatastoreV1.Read read =
         DatastoreIO.v1()
             .read()
@@ -88,8 +100,23 @@ public class V1ReadIT {
     Pipeline p = Pipeline.create(options);
     PCollection<Long> count = p.apply(read).apply(Count.globally());
 
-    PAssert.thatSingleton(count).isEqualTo(numEntities);
+    PAssert.thatSingleton(count).isEqualTo(totalNumEntities);
     p.run();
+
+    // Read entities with readTime.
+    DatastoreV1.Read snapshotRead =
+        DatastoreIO.v1()
+            .read()
+            .withProjectId(project)
+            .withQuery(query)
+            .withNamespace(options.getNamespace())
+            .withReadTime(readTime);
+
+    Pipeline p2 = Pipeline.create(options);
+    PCollection<Long> count2 = p2.apply(snapshotRead).apply(Count.globally());
+
+    PAssert.thatSingleton(count2).isEqualTo(numEntitiesBeforeReadTime);
+    p2.run();
   }
 
   @Test
@@ -114,12 +141,13 @@ public class V1ReadIT {
             "SELECT * from %s WHERE __key__ HAS ANCESTOR KEY(%s, '%s')",
             options.getKind(), options.getKind(), ancestor);
 
-    long expectedNumEntities = numEntities;
+    long expectedNumEntities = totalNumEntities;
     if (limit > 0) {
       gqlQuery = String.format("%s LIMIT %d", gqlQuery, limit);
       expectedNumEntities = limit;
     }
 
+    // Read entities without readTime.
     DatastoreV1.Read read =
         DatastoreIO.v1()
             .read()
@@ -133,18 +161,36 @@ public class V1ReadIT {
 
     PAssert.thatSingleton(count).isEqualTo(expectedNumEntities);
     p.run();
+
+    // Read entities with readTime.
+    DatastoreV1.Read snapshotRead =
+        DatastoreIO.v1()
+            .read()
+            .withProjectId(project)
+            .withLiteralGqlQuery(gqlQuery)
+            .withNamespace(options.getNamespace())
+            .withReadTime(readTime);
+
+    Pipeline p2 = Pipeline.create(options);
+    PCollection<Long> count2 = p.apply(snapshotRead).apply(Count.globally());
+
+    long expectedNumEntities2 = limit > 0 ? limit : numEntitiesBeforeReadTime;
+    PAssert.thatSingleton(count2).isEqualTo(expectedNumEntities2);
+    p2.run();
   }
 
   // Creates entities and write them to datastore
   private static void writeEntitiesToDatastore(
-      V1TestOptions options, String project, String ancestor, long numEntities) throws Exception {
+      V1TestOptions options, String project, String ancestor, long valueOffset, long numEntities)
+      throws Exception {
     Datastore datastore = getDatastore(options, project);
     // Write test entities to datastore
     V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder());
     Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
 
     for (long i = 0; i < numEntities; i++) {
-      Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace(), 0);
+      Entity entity =
+          makeEntity(valueOffset + i, ancestorKey, options.getKind(), options.getNamespace(), 0);
       writer.write(entity);
     }
     writer.close();