You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/20 01:10:49 UTC

[1/4] incubator-beam git commit: Closes #858

Repository: incubator-beam
Updated Branches:
  refs/heads/master c57643f52 -> 921d0b2e7


Closes #858


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/921d0b2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/921d0b2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/921d0b2e

Branch: refs/heads/master
Commit: 921d0b2e752e7453332af651170142b1cd9a0cfa
Parents: c57643f 54e4cb1
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 19 18:10:39 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 18:10:39 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |    4 +-
 .../beam/examples/complete/AutoComplete.java    |   12 +-
 .../examples/cookbook/DatastoreWordCount.java   |   24 +-
 pom.xml                                         |    8 +-
 runners/google-cloud-dataflow-java/pom.xml      |    2 +-
 sdks/java/io/google-cloud-platform/pom.xml      |    4 +-
 .../beam/sdk/io/gcp/datastore/DatastoreIO.java  |    8 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 1034 ++++++++++++++++++
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 1033 -----------------
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  792 ++++++++++++++
 .../sdk/io/gcp/datastore/V1Beta3ReadIT.java     |  114 --
 .../beam/sdk/io/gcp/datastore/V1Beta3Test.java  |  792 --------------
 .../io/gcp/datastore/V1Beta3TestOptions.java    |   44 -
 .../sdk/io/gcp/datastore/V1Beta3TestUtil.java   |  382 -------
 .../sdk/io/gcp/datastore/V1Beta3WriteIT.java    |   85 --
 .../beam/sdk/io/gcp/datastore/V1ReadIT.java     |  114 ++
 .../sdk/io/gcp/datastore/V1TestOptions.java     |   44 +
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java   |  382 +++++++
 .../beam/sdk/io/gcp/datastore/V1WriteIT.java    |   85 ++
 19 files changed, 2482 insertions(+), 2481 deletions(-)
----------------------------------------------------------------------



[4/4] incubator-beam git commit: DatastoreIO v1beta3 to v1

Posted by dh...@apache.org.
DatastoreIO v1beta3 to v1


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54e4cb12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54e4cb12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54e4cb12

Branch: refs/heads/master
Commit: 54e4cb123187992b64b1580869ae5857f0ef613b
Parents: c57643f
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Fri Aug 19 16:18:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 18:10:39 2016 -0700

----------------------------------------------------------------------
 examples/java/pom.xml                           |    4 +-
 .../beam/examples/complete/AutoComplete.java    |   12 +-
 .../examples/cookbook/DatastoreWordCount.java   |   24 +-
 pom.xml                                         |    8 +-
 runners/google-cloud-dataflow-java/pom.xml      |    2 +-
 sdks/java/io/google-cloud-platform/pom.xml      |    4 +-
 .../beam/sdk/io/gcp/datastore/DatastoreIO.java  |    8 +-
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  | 1034 ++++++++++++++++++
 .../beam/sdk/io/gcp/datastore/V1Beta3.java      | 1033 -----------------
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  792 ++++++++++++++
 .../sdk/io/gcp/datastore/V1Beta3ReadIT.java     |  114 --
 .../beam/sdk/io/gcp/datastore/V1Beta3Test.java  |  792 --------------
 .../io/gcp/datastore/V1Beta3TestOptions.java    |   44 -
 .../sdk/io/gcp/datastore/V1Beta3TestUtil.java   |  382 -------
 .../sdk/io/gcp/datastore/V1Beta3WriteIT.java    |   85 --
 .../beam/sdk/io/gcp/datastore/V1ReadIT.java     |  114 ++
 .../sdk/io/gcp/datastore/V1TestOptions.java     |   44 +
 .../beam/sdk/io/gcp/datastore/V1TestUtil.java   |  382 +++++++
 .../beam/sdk/io/gcp/datastore/V1WriteIT.java    |   85 ++
 19 files changed, 2482 insertions(+), 2481 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 21d7a3a..096bc4e 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -323,12 +323,12 @@
 
     <dependency>
       <groupId>com.google.cloud.datastore</groupId>
-      <artifactId>datastore-v1beta3-proto-client</artifactId>
+      <artifactId>datastore-v1-proto-client</artifactId>
     </dependency>
 
     <dependency>
       <groupId>com.google.cloud.datastore</groupId>
-      <artifactId>datastore-v1beta3-protos</artifactId>
+      <artifactId>datastore-v1-protos</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index baae100..120c64f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -18,8 +18,8 @@
 package org.apache.beam.examples.complete;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
 
 import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
 import org.apache.beam.examples.common.ExampleOptions;
@@ -59,9 +59,9 @@ import com.google.api.services.bigquery.model.TableReference;
 import com.google.api.services.bigquery.model.TableRow;
 import com.google.api.services.bigquery.model.TableSchema;
 import com.google.common.base.MoreObjects;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Value;
 
 import org.joda.time.Duration;
 
@@ -488,7 +488,7 @@ public class AutoComplete {
       toWrite
       .apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind(),
           options.getDatastoreAncestorKey())))
-      .apply(DatastoreIO.v1beta3().write().withProjectId(MoreObjects.firstNonNull(
+      .apply(DatastoreIO.v1().write().withProjectId(MoreObjects.firstNonNull(
           options.getOutputProject(), options.getProject())));
     }
     if (options.getOutputToBigQuery()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 21220b8..215e2ff 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -17,16 +17,16 @@
  */
 package org.apache.beam.examples.cookbook;
 
-import static com.google.datastore.v1beta3.client.DatastoreHelper.getString;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
+import static com.google.datastore.v1.client.DatastoreHelper.getString;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
 
 import org.apache.beam.examples.WordCount;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -36,11 +36,11 @@ import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
 
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.PropertyFilter;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.PropertyFilter;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.Value;
 
 import java.util.Map;
 import java.util.UUID;
@@ -194,7 +194,7 @@ public class DatastoreWordCount {
       Pipeline p = Pipeline.create(options);
       p.apply("ReadLines", TextIO.Read.from(options.getInput()))
        .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
-       .apply(DatastoreIO.v1beta3().write().withProjectId(options.getProject()));
+       .apply(DatastoreIO.v1().write().withProjectId(options.getProject()));
 
       p.run();
   }
@@ -225,7 +225,7 @@ public class DatastoreWordCount {
     Query query = makeAncestorKindQuery(options);
 
     // For Datastore sources, the read namespace can be set on the entire query.
-    V1Beta3.Read read = DatastoreIO.v1beta3().read()
+    DatastoreV1.Read read = DatastoreIO.v1().read()
         .withProjectId(options.getProject())
         .withQuery(query)
         .withNamespace(options.getNamespace());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58adbe7..f9e0479 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,8 +107,8 @@
     <clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
     <dataflow.version>v1b3-rev36-1.22.0</dataflow.version>
     <dataflow.proto.version>0.5.160222</dataflow.proto.version>
-    <datastore.client.version>1.0.0-beta.2</datastore.client.version>
-    <datastore.proto.version>1.0.0-beta</datastore.proto.version>
+    <datastore.client.version>1.1.0</datastore.client.version>
+    <datastore.proto.version>1.0.1</datastore.proto.version>
     <google-auto-service.version>1.0-rc2</google-auto-service.version>
     <google-auto-value.version>1.1</google-auto-value.version>
     <google-clients.version>1.22.0</google-clients.version>
@@ -450,13 +450,13 @@
 
       <dependency>
         <groupId>com.google.cloud.datastore</groupId>
-        <artifactId>datastore-v1beta3-proto-client</artifactId>
+        <artifactId>datastore-v1-proto-client</artifactId>
         <version>${datastore.client.version}</version>
       </dependency>
 
       <dependency>
         <groupId>com.google.cloud.datastore</groupId>
-        <artifactId>datastore-v1beta3-protos</artifactId>
+        <artifactId>datastore-v1-protos</artifactId>
         <version>${datastore.proto.version}</version>
       </dependency>
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 00b5a9b..0044823 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -409,7 +409,7 @@
 
     <dependency>
       <groupId>com.google.cloud.datastore</groupId>
-      <artifactId>datastore-v1beta3-protos</artifactId>
+      <artifactId>datastore-v1-protos</artifactId>
       <scope>test</scope>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 1596a66..8075335 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -115,12 +115,12 @@
 
     <dependency>
       <groupId>com.google.cloud.datastore</groupId>
-      <artifactId>datastore-v1beta3-proto-client</artifactId>
+      <artifactId>datastore-v1-proto-client</artifactId>
     </dependency>
 
     <dependency>
       <groupId>com.google.cloud.datastore</groupId>
-      <artifactId>datastore-v1beta3-protos</artifactId>
+      <artifactId>datastore-v1-protos</artifactId>
     </dependency>
 
     <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
index bde0aba..5abf015 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.annotations.Experimental;
  * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
  * versions of the Datastore Client libraries.
  *
- * <p>To use the v1beta3 version see {@link V1Beta3}.
+ * <p>To use the v1 version see {@link DatastoreV1}.
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
 public class DatastoreIO {
@@ -32,10 +32,10 @@ public class DatastoreIO {
   private DatastoreIO() {}
 
   /**
-   * Returns a {@link V1Beta3} that provides an API for accessing Datastore through v1beta3 version
+   * Returns a {@link DatastoreV1} that provides an API for accessing Datastore through v1 version
    * of Datastore Client library.
    */
-  public static V1Beta3 v1beta3() {
-    return new V1Beta3();
+  public static DatastoreV1 v1() {
+    return new DatastoreV1();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
new file mode 100644
index 0000000..852595a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -0,0 +1,1034 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Verify.verify;
+import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Key.PathElement;
+import com.google.datastore.v1.Mutation;
+import com.google.datastore.v1.PartitionId;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.RunQueryRequest;
+import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.DatastoreException;
+import com.google.datastore.v1.client.DatastoreFactory;
+import com.google.datastore.v1.client.DatastoreHelper;
+import com.google.datastore.v1.client.DatastoreOptions;
+import com.google.datastore.v1.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+
+/**
+ * <p>{@link DatastoreV1} provides an API to Read, Write and Delete {@link PCollection PCollections}
+ * of <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1
+ * {@link Entity} objects.
+ *
+ * <p>This API currently requires an authentication workaround. To use {@link DatastoreV1}, users
+ * must use the {@code gcloud} command line tool to get credentials for Datastore:
+ * <pre>
+ * $ gcloud auth login
+ * </pre>
+ *
+ * <p>To read a {@link PCollection} from a query to Datastore, use {@link DatastoreV1#read} and
+ * its methods {@link DatastoreV1.Read#withProjectId} and {@link DatastoreV1.Read#withQuery} to
+ * specify the project to query and the query to read from. You can optionally provide a namespace
+ * to query within using {@link DatastoreV1.Read#withNamespace}. You could also optionally specify
+ * how many splits you want for the query using {@link DatastoreV1.Read#withNumQuerySplits}.
+ *
+ * <p>For example:
+ *
+ * <pre> {@code
+ * // Read a query from Datastore
+ * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+ * Query query = ...;
+ * String projectId = "...";
+ *
+ * Pipeline p = Pipeline.create(options);
+ * PCollection<Entity> entities = p.apply(
+ *     DatastoreIO.v1().read()
+ *         .withProjectId(projectId)
+ *         .withQuery(query));
+ * } </pre>
+ *
+ * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
+ * many workers. However, when the {@link Query} is configured with a limit using
+ * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)}, then
+ * all returned results will be read by a single Dataflow worker in order to ensure correct data.
+ *
+ * <p>To write a {@link PCollection} to a Datastore, use {@link DatastoreV1#write},
+ * specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1().write().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>To delete a {@link PCollection} of {@link Entity Entities} from Datastore, use
+ * {@link DatastoreV1#deleteEntity()}, specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1().deleteEntity().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Datastore,
+ * use {@link DatastoreV1#deleteKey}, specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1().deleteKey().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete
+ * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
+ * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
+ * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
+ *
+ * <pre>{@code
+ * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
+ * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+ * }</pre>
+ *
+ * <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please
+ * read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties,
+ * and Keys</a> for more information about {@code Entity} keys.
+ *
+ * <p><h3>Permissions</h3>
+ * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
+ * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
+ * more details.
+ *
+ * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
+ * </a>for security and permission related information specific to Datastore.
+ *
+ * @see org.apache.beam.sdk.runners.PipelineRunner
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class DatastoreV1 {
+
+  // A package-private constructor to prevent direct instantiation from outside of this package
+  DatastoreV1() {}
+
+  /**
+   * Datastore has a limit of 500 mutations per batch operation, so we flush
+   * changes to Datastore every 500 entities.
+   */
+  @VisibleForTesting
+  static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+
+  /**
+   * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
+   * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
+   * {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery},
+   * {@link DatastoreV1.Read#withNamespace}, {@link DatastoreV1.Read#withNumQuerySplits}.
+   */
+  public DatastoreV1.Read read() {
+    return new DatastoreV1.Read(null, null, null, 0);
+  }
+
+  /**
+   * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity}
+   * objects.
+   *
+   * @see DatastoreIO
+   */
+  public static class Read extends PTransform<PBegin, PCollection<Entity>> {
+    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+    /** An upper bound on the number of splits for a query. */
+    public static final int NUM_QUERY_SPLITS_MAX = 50000;
+
+    /** A lower bound on the number of splits for a query. */
+    static final int NUM_QUERY_SPLITS_MIN = 12;
+
+    /** Default bundle size of 64MB. */
+    static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024;
+
+    /**
+     * Maximum number of results to request per query.
+     *
+     * <p>Must be set, or it may result in an I/O error when querying Cloud Datastore.
+     */
+    static final int QUERY_BATCH_LIMIT = 500;
+
+    @Nullable
+    private final String projectId;
+
+    @Nullable
+    private final Query query;
+
+    @Nullable
+    private final String namespace;
+
+    private final int numQuerySplits;
+
+    /**
+     * Computes the number of splits to be performed on the given query by querying the estimated
+     * size from Datastore.
+     */
+    static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
+      int numSplits;
+      try {
+        long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
+        numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX,
+            Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES));
+      } catch (Exception e) {
+        LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", query, e);
+        // Fallback in case estimated size is unavailable.
+        numSplits = NUM_QUERY_SPLITS_MIN;
+      }
+      return Math.max(numSplits, NUM_QUERY_SPLITS_MIN);
+    }
+
+    /**
+     * Get the estimated size of the data returned by the given query.
+     *
+     * <p>Datastore provides no way to get a good estimate of how large the result of a query
+     * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
+     * is specified in the query.
+     *
+     * <p>See https://cloud.google.com/datastore/docs/concepts/stats.
+     */
+    static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
+        throws DatastoreException {
+      String ourKind = query.getKind(0).getName();
+      Query.Builder queryBuilder = Query.newBuilder();
+      if (namespace == null) {
+        queryBuilder.addKindBuilder().setName("__Stat_Kind__");
+      } else {
+        queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__");
+      }
+      queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build()));
+
+      // Get the latest statistics
+      queryBuilder.addOrder(makeOrder("timestamp", DESCENDING));
+      queryBuilder.setLimit(Int32Value.newBuilder().setValue(1));
+
+      RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+
+      long now = System.currentTimeMillis();
+      RunQueryResponse response = datastore.runQuery(request);
+      LOG.debug("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
+
+      QueryResultBatch batch = response.getBatch();
+      if (batch.getEntityResultsCount() == 0) {
+        throw new NoSuchElementException(
+            "Datastore statistics for kind " + ourKind + " unavailable");
+      }
+      Entity entity = batch.getEntityResults(0).getEntity();
+      return entity.getProperties().get("entity_bytes").getIntegerValue();
+    }
+
+    /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */
+    static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
+      RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
+      if (namespace != null) {
+        requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+      }
+      return requestBuilder.build();
+    }
+
+    /**
+     * A helper function to get the split queries, taking into account the optional
+     * {@code namespace}.
+     */
+    private static List<Query> splitQuery(Query query, @Nullable String namespace,
+        Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException {
+      // If namespace is set, include it in the split request so splits are calculated accordingly.
+      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
+      if (namespace != null) {
+        partitionBuilder.setNamespaceId(namespace);
+      }
+
+      return querySplitter.getSplits(query, partitionBuilder.build(), numSplits, datastore);
+    }
+
+    /**
+     * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
+     * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
+     * an error will be thrown.
+     */
+    private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace,
+        int numQuerySplits) {
+      this.projectId = projectId;
+      this.query = query;
+      this.namespace = namespace;
+      this.numQuerySplits = numQuerySplits;
+    }
+
+    /**
+     * Returns a new {@link DatastoreV1.Read} that reads from the Datastore for the specified
+     * project.
+     */
+    public DatastoreV1.Read withProjectId(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new DatastoreV1.Read(projectId, query, namespace, numQuerySplits);
+    }
+
+    /**
+     * Returns a new {@link DatastoreV1.Read} that reads the results of the specified query.
+     *
+     * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
+     * across many workers. However, when the {@link Query} is configured with a limit using
+     * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order
+     * to ensure correct results.
+     */
+    public DatastoreV1.Read withQuery(Query query) {
+      checkNotNull(query, "query");
+      checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
+          "Invalid query limit %s: must be positive", query.getLimit().getValue());
+      return new DatastoreV1.Read(projectId, query, namespace, numQuerySplits);
+    }
+
+    /**
+     * Returns a new {@link DatastoreV1.Read} that reads from the given namespace.
+     */
+    public DatastoreV1.Read withNamespace(String namespace) {
+      return new DatastoreV1.Read(projectId, query, namespace, numQuerySplits);
+    }
+
+    /**
+     * Returns a new {@link DatastoreV1.Read} that reads by splitting the given {@code query} into
+     * {@code numQuerySplits}.
+     *
+     * <p>The semantics for the query splitting is defined below:
+     * <ul>
+     *   <li>Any value less than or equal to 0 will be ignored, and the number of splits will be
+     *   chosen dynamically at runtime based on the query data size.
+     *   <li>Any value greater than {@link Read#NUM_QUERY_SPLITS_MAX} will be capped at
+     *   {@code NUM_QUERY_SPLITS_MAX}.
+     *   <li>If the {@code query} has a user limit set, then {@code numQuerySplits} will be
+     *   ignored and no split will be performed.
+     *   <li>Under certain cases Cloud Datastore is unable to split query to the requested number of
+     *   splits. In such cases we just use whatever the Datastore returns.
+     * </ul>
+     */
+    public DatastoreV1.Read withNumQuerySplits(int numQuerySplits) {
+      return new DatastoreV1.Read(projectId, query, namespace,
+          Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX));
+    }
+
+    @Nullable
+    public Query getQuery() {
+      return query;
+    }
+
+    @Nullable
+    public String getProjectId() {
+      return projectId;
+    }
+
+    @Nullable
+    public String getNamespace() {
+      return namespace;
+    }
+
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public PCollection<Entity> apply(PBegin input) {
+      V1Options v1Options = V1Options.from(getProjectId(), getQuery(),
+          getNamespace());
+
+      /*
+       * This composite transform involves the following steps:
+       *   1. Create a singleton of the user provided {@code query} and apply a {@link ParDo} that
+       *   splits the query into {@code numQuerySplits} and assign each split query a unique
+       *   {@code Integer} as the key. The resulting output is of the type
+       *   {@code PCollection<KV<Integer, Query>>}.
+       *
+       *   If the value of {@code numQuerySplits} is less than or equal to 0, then the number of
+       *   splits will be computed dynamically based on the size of the data for the {@code query}.
+       *
+       *   2. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The
+       *   queries are extracted from they {@code KV<Integer, Iterable<Query>>} and flattened to
+       *   output a {@code PCollection<Query>}.
+       *
+       *   3. In the third step, a {@code ParDo} reads entities for each query and outputs
+       *   a {@code PCollection<Entity>}.
+       */
+      PCollection<KV<Integer, Query>> queries = input
+          .apply(Create.of(query))
+          .apply(ParDo.of(new SplitQueryFn(v1Options, numQuerySplits)));
+
+      PCollection<Query> shardedQueries = queries
+          .apply(GroupByKey.<Integer, Query>create())
+          .apply(Values.<Iterable<Query>>create())
+          .apply(Flatten.<Query>iterables());
+
+      PCollection<Entity> entities = shardedQueries
+          .apply(ParDo.of(new ReadFn(v1Options)));
+
+      return entities;
+    }
+
+    @Override
+    public void validate(PBegin input) {
+      checkNotNull(projectId, "projectId");
+      checkNotNull(query, "query");
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("ProjectId"))
+          .addIfNotNull(DisplayData.item("namespace", namespace)
+              .withLabel("Namespace"))
+          .addIfNotNull(DisplayData.item("query", query.toString())
+              .withLabel("Query"));
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("projectId", projectId)
+          .add("query", query)
+          .add("namespace", namespace)
+          .toString();
+    }
+
+    /**
+     * A class for v1 Datastore related options.
+     */
+    @VisibleForTesting
+    static class V1Options implements Serializable {
+      private final Query query;
+      private final String projectId;
+      @Nullable
+      private final String namespace;
+
+      private V1Options(String projectId, Query query, @Nullable String namespace) {
+        this.projectId = checkNotNull(projectId, "projectId");
+        this.query = checkNotNull(query, "query");
+        this.namespace = namespace;
+      }
+
+      public static V1Options from(String projectId, Query query, @Nullable String namespace) {
+        return new V1Options(projectId, query, namespace);
+      }
+
+      public Query getQuery() {
+        return query;
+      }
+
+      public String getProjectId() {
+        return projectId;
+      }
+
+      @Nullable
+      public String getNamespace() {
+        return namespace;
+      }
+    }
+
+    /**
+     * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique
+     * keys and outputs them as {@link KV}.
+     */
+    @VisibleForTesting
+    static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
+      private final V1Options options;
+      // number of splits to make for a given query
+      private final int numSplits;
+
+      private final V1DatastoreFactory datastoreFactory;
+      // Datastore client
+      private transient Datastore datastore;
+      // Query splitter
+      private transient QuerySplitter querySplitter;
+
+      public SplitQueryFn(V1Options options, int numSplits) {
+        this(options, numSplits, new V1DatastoreFactory());
+      }
+
+      @VisibleForTesting
+      SplitQueryFn(V1Options options, int numSplits,
+          V1DatastoreFactory datastoreFactory) {
+        this.options = options;
+        this.numSplits = numSplits;
+        this.datastoreFactory = datastoreFactory;
+      }
+
+      @StartBundle
+      public void startBundle(Context c) throws Exception {
+        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
+        querySplitter = datastoreFactory.getQuerySplitter();
+      }
+
+      @ProcessElement
+      public void processElement(ProcessContext c) throws Exception {
+        int key = 1;
+        Query query = c.element();
+
+        // If query has a user set limit, then do not split.
+        if (query.hasLimit()) {
+          c.output(KV.of(key, query));
+          return;
+        }
+
+        int estimatedNumSplits;
+        // Compute the estimated numSplits if numSplits is not specified by the user.
+        if (numSplits <= 0) {
+          estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace());
+        } else {
+          estimatedNumSplits = numSplits;
+        }
+
+        List<Query> querySplits;
+        try {
+          querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter,
+              estimatedNumSplits);
+        } catch (Exception e) {
+          LOG.warn("Unable to parallelize the given query: {}", query, e);
+          querySplits = ImmutableList.of(query);
+        }
+
+        // assign unique keys to query splits.
+        for (Query subquery : querySplits) {
+          c.output(KV.of(key++, subquery));
+        }
+      }
+
+      @Override
+      public void populateDisplayData(Builder builder) {
+        super.populateDisplayData(builder);
+        builder
+            .addIfNotNull(DisplayData.item("projectId", options.getProjectId())
+                .withLabel("ProjectId"))
+            .addIfNotNull(DisplayData.item("namespace", options.getNamespace())
+                .withLabel("Namespace"))
+            .addIfNotNull(DisplayData.item("query", options.getQuery().toString())
+                .withLabel("Query"));
+      }
+    }
+
+    /**
+     * A {@link DoFn} that reads entities from Datastore for each query.
+     */
+    @VisibleForTesting
+    static class ReadFn extends DoFn<Query, Entity> {
+      private final V1Options options;
+      private final V1DatastoreFactory datastoreFactory;
+      // Datastore client
+      private transient Datastore datastore;
+
+      public ReadFn(V1Options options) {
+        this(options, new V1DatastoreFactory());
+      }
+
+      @VisibleForTesting
+      ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) {
+        this.options = options;
+        this.datastoreFactory = datastoreFactory;
+      }
+
+      @StartBundle
+      public void startBundle(Context c) throws Exception {
+        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
+      }
+
+      /** Read and output entities for the given query. */
+      @ProcessElement
+      public void processElement(ProcessContext context) throws Exception {
+        Query query = context.element();
+        String namespace = options.getNamespace();
+        int userLimit = query.hasLimit()
+            ? query.getLimit().getValue() : Integer.MAX_VALUE;
+
+        boolean moreResults = true;
+        QueryResultBatch currentBatch = null;
+
+        while (moreResults) {
+          Query.Builder queryBuilder = query.toBuilder().clone();
+          queryBuilder.setLimit(Int32Value.newBuilder().setValue(
+              Math.min(userLimit, QUERY_BATCH_LIMIT)));
+
+          if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+            queryBuilder.setStartCursor(currentBatch.getEndCursor());
+          }
+
+          RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+          RunQueryResponse response = datastore.runQuery(request);
+
+          currentBatch = response.getBatch();
+
+          // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
+          // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
+          // use result count to determine if more results might exist.
+          int numFetch = currentBatch.getEntityResultsCount();
+          if (query.hasLimit()) {
+            verify(userLimit >= numFetch,
+                "Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit",
+                userLimit, numFetch, query.getLimit());
+            userLimit -= numFetch;
+          }
+
+          // output all the entities from the current batch.
+          for (EntityResult entityResult : currentBatch.getEntityResultsList()) {
+            context.output(entityResult.getEntity());
+          }
+
+          // Check if we have more entities to be read.
+          moreResults =
+              // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied
+              (userLimit > 0)
+                  // All indications from the API are that there are/may be more results.
+                  && ((numFetch == QUERY_BATCH_LIMIT)
+                  || (currentBatch.getMoreResults() == NOT_FINISHED));
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns an empty {@link DatastoreV1.Write} builder. Configure the destination
+   * {@code projectId} using {@link DatastoreV1.Write#withProjectId}.
+   */
+  public Write write() {
+    return new Write(null);
+  }
+
+  /**
+   * Returns an empty {@link DeleteEntity} builder. Configure the destination
+   * {@code projectId} using {@link DeleteEntity#withProjectId}.
+   */
+  public DeleteEntity deleteEntity() {
+    return new DeleteEntity(null);
+  }
+
+  /**
+   * Returns an empty {@link DeleteKey} builder. Configure the destination
+   * {@code projectId} using {@link DeleteKey#withProjectId}.
+   */
+  public DeleteKey deleteKey() {
+    return new DeleteKey(null);
+  }
+
+  /**
+   * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
+   *
+   * @see DatastoreIO
+   */
+  public static class Write extends Mutate<Entity> {
+    /**
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
+     */
+    Write(@Nullable String projectId) {
+      super(projectId, new UpsertFn());
+    }
+
+    /**
+     * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project.
+     */
+    public Write withProjectId(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new Write(projectId);
+    }
+  }
+
+  /**
+   * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore.
+   *
+   * @see DatastoreIO
+   */
+  public static class DeleteEntity extends Mutate<Entity> {
+    /**
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
+     */
+    DeleteEntity(@Nullable String projectId) {
+      super(projectId, new DeleteEntityFn());
+    }
+
+    /**
+     * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the
+     * specified project.
+     */
+    public DeleteEntity withProjectId(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new DeleteEntity(projectId);
+    }
+  }
+
+  /**
+   * A {@link PTransform} that deletes {@link Entity Entities} associated with the given
+   * {@link Key Keys} from Cloud Datastore.
+   *
+   * @see DatastoreIO
+   */
+  public static class DeleteKey extends Mutate<Key> {
+    /**
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
+     */
+    DeleteKey(@Nullable String projectId) {
+      super(projectId, new DeleteKeyFn());
+    }
+
+    /**
+     * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the
+     * specified project.
+     */
+    public DeleteKey withProjectId(String projectId) {
+      checkNotNull(projectId, "projectId");
+      return new DeleteKey(projectId);
+    }
+  }
+
+  /**
+   * A {@link PTransform} that writes mutations to Cloud Datastore.
+   *
+   * <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}.
+   * {@code T} is usually either an {@link Entity} or a {@link Key}
+   * <b>Note:</b> Only idempotent Cloud Datastore mutation operations (upsert and delete) should
+   * be used by the {@code DoFn} provided, as the commits are retried when failures occur.
+   */
+  private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
+    @Nullable
+    private final String projectId;
+    /** A function that transforms each {@code T} into a mutation. */
+    private final SimpleFunction<T, Mutation> mutationFn;
+
+    /**
+     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+     * it is {@code null} at instantiation time, an error will be thrown.
+     */
+    Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) {
+      this.projectId = projectId;
+      this.mutationFn = checkNotNull(mutationFn);
+    }
+
+    @Override
+    public PDone apply(PCollection<T> input) {
+      input.apply("Convert to Mutation", MapElements.via(mutationFn))
+          .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
+
+      return PDone.in(input.getPipeline());
+    }
+
+    @Override
+    public void validate(PCollection<T> input) {
+      checkNotNull(projectId, "projectId");
+      checkNotNull(mutationFn, "mutationFn");
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("projectId", projectId)
+          .add("mutationFn", mutationFn.getClass().getName())
+          .toString();
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("Output Project"))
+          .include(mutationFn);
+    }
+
+    public String getProjectId() {
+      return projectId;
+    }
+  }
+
+  /**
+   * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in
+   * batches, where the maximum batch size is {@link DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}.
+   *
+   * <p>See <a
+   * href="https://cloud.google.com/datastore/docs/concepts/entities">
+   * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations.
+   *
+   * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
+   * group, the commit will be retried (up to {@link DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}
+   * times). This means that the mutation operation should be idempotent. Thus, the writer should
+   * only be used for {code upsert} and {@code delete} mutation operations, as these are the only
+   * two Cloud Datastore mutations that are idempotent.
+   */
+  @VisibleForTesting
+  static class DatastoreWriterFn extends DoFn<Mutation, Void> {
+    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
+    private final String projectId;
+    private transient Datastore datastore;
+    private final V1DatastoreFactory datastoreFactory;
+    // Current batch of mutations to be written.
+    private final List<Mutation> mutations = new ArrayList<>();
+    /**
+     * Since a bundle is written in batches, we should retry the commit of a batch in order to
+     * prevent transient errors from causing the bundle to fail.
+     */
+    private static final int MAX_RETRIES = 5;
+
+    /**
+     * Initial backoff time for exponential backoff for retry attempts.
+     */
+    private static final int INITIAL_BACKOFF_MILLIS = 5000;
+
+    DatastoreWriterFn(String projectId) {
+      this(projectId, new V1DatastoreFactory());
+    }
+
+    @VisibleForTesting
+    DatastoreWriterFn(String projectId, V1DatastoreFactory datastoreFactory) {
+      this.projectId = checkNotNull(projectId, "projectId");
+      this.datastoreFactory = datastoreFactory;
+    }
+
+    @StartBundle
+    public void startBundle(Context c) {
+      datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      mutations.add(c.element());
+      if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
+        flushBatch();
+      }
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      if (mutations.size() > 0) {
+        flushBatch();
+      }
+    }
+
+    /**
+     * Writes a batch of mutations to Cloud Datastore.
+     *
+     * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
+     * times). All mutations in the batch will be committed again, even if the commit was partially
+     * successful. If the retry limit is exceeded, the last exception from the Datastore will be
+     * thrown.
+     *
+     * @throws DatastoreException if the commit fails or IOException or InterruptedException if
+     * backing off between retries fails.
+     */
+    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+      LOG.debug("Writing batch of {} mutations", mutations.size());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+      while (true) {
+        // Batch upsert entities.
+        try {
+          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+          commitRequest.addAllMutations(mutations);
+          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+          datastore.commit(commitRequest.build());
+          // Break if the commit threw no exception.
+          break;
+        } catch (DatastoreException exception) {
+          // Only log the code and message for potentially-transient errors. The entire exception
+          // will be propagated upon the last retry.
+          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+              exception.getMessage());
+          if (!BackOffUtils.next(sleeper, backoff)) {
+            LOG.error("Aborting after {} retries.", MAX_RETRIES);
+            throw exception;
+          }
+        }
+      }
+      LOG.debug("Successfully wrote {} mutations", mutations.size());
+      mutations.clear();
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      super.populateDisplayData(builder);
+      builder
+          .addIfNotNull(DisplayData.item("projectId", projectId)
+              .withLabel("Output Project"));
+    }
+  }
+
+  /**
+   * Returns true if a Datastore key is complete. A key is complete if its last element
+   * has either an id or a name.
+   */
+  static boolean isValidKey(Key key) {
+    List<PathElement> elementList = key.getPathList();
+    if (elementList.isEmpty()) {
+      return false;
+    }
+    PathElement lastElement = elementList.get(elementList.size() - 1);
+    return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+  }
+
+  /**
+   * A function that constructs an upsert {@link Mutation} from an {@link Entity}.
+   */
+  @VisibleForTesting
+  static class UpsertFn extends SimpleFunction<Entity, Mutation> {
+    @Override
+    public Mutation apply(Entity entity) {
+      // Verify that the entity to write has a complete key.
+      checkArgument(isValidKey(entity.getKey()),
+          "Entities to be written to the Datastore must have complete keys:\n%s", entity);
+
+      return makeUpsert(entity).build();
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      builder.add(DisplayData.item("upsertFn", this.getClass())
+          .withLabel("Create Upsert Mutation"));
+    }
+  }
+
+  /**
+   * A function that constructs a delete {@link Mutation} from an {@link Entity}.
+   */
+  @VisibleForTesting
+  static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> {
+    @Override
+    public Mutation apply(Entity entity) {
+      // Verify that the entity to delete has a complete key.
+      checkArgument(isValidKey(entity.getKey()),
+          "Entities to be deleted from the Datastore must have complete keys:\n%s", entity);
+
+      return makeDelete(entity.getKey()).build();
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      builder.add(DisplayData.item("deleteEntityFn", this.getClass())
+          .withLabel("Create Delete Mutation"));
+    }
+  }
+
+  /**
+   * A function that constructs a delete {@link Mutation} from a {@link Key}.
+   */
+  @VisibleForTesting
+  static class DeleteKeyFn extends SimpleFunction<Key, Mutation> {
+    @Override
+    public Mutation apply(Key key) {
+      // Verify that the entity to delete has a complete key.
+      checkArgument(isValidKey(key),
+          "Keys to be deleted from the Datastore must be complete:\n%s", key);
+
+      return makeDelete(key).build();
+    }
+
+    @Override
+    public void populateDisplayData(Builder builder) {
+      builder.add(DisplayData.item("deleteKeyFn", this.getClass())
+          .withLabel("Create Delete Mutation"));
+    }
+  }
+
+  /**
+   * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and
+   * {@link QuerySplitter}
+   *
+   * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence
+   * wrapping them under this class, which implements {@link Serializable}.
+   */
+  @VisibleForTesting
+  static class V1DatastoreFactory implements Serializable {
+
+    /** Builds a Datastore client for the given pipeline options and project. */
+    public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+      DatastoreOptions.Builder builder =
+          new DatastoreOptions.Builder()
+              .projectId(projectId)
+              .initializer(
+                  new RetryHttpRequestInitializer()
+              );
+
+      Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+      if (credential != null) {
+        builder.credential(credential);
+      }
+
+      return DatastoreFactory.get().create(builder.build());
+    }
+
+    /** Builds a Datastore {@link QuerySplitter}. */
+    public QuerySplitter getQuerySplitter() {
+      return DatastoreHelper.getQuerySplitter();
+    }
+  }
+}


[3/4] incubator-beam git commit: DatastoreIO v1beta3 to v1

Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
deleted file mode 100644
index 8503b66..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ /dev/null
@@ -1,1033 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
-import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
-import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Key.PathElement;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreException;
-import com.google.datastore.v1beta3.client.DatastoreFactory;
-import com.google.datastore.v1beta3.client.DatastoreHelper;
-import com.google.datastore.v1beta3.client.DatastoreOptions;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-
-/**
- * <p>{@link V1Beta3} provides an API to Read, Write and Delete {@link PCollection PCollections} of
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3
- * {@link Entity} objects.
- *
- * <p>This API currently requires an authentication workaround. To use {@link V1Beta3}, users
- * must use the {@code gcloud} command line tool to get credentials for Datastore:
- * <pre>
- * $ gcloud auth login
- * </pre>
- *
- * <p>To read a {@link PCollection} from a query to Datastore, use {@link V1Beta3#read} and
- * its methods {@link V1Beta3.Read#withProjectId} and {@link V1Beta3.Read#withQuery} to
- * specify the project to query and the query to read from. You can optionally provide a namespace
- * to query within using {@link V1Beta3.Read#withNamespace}. You could also optionally specify
- * how many splits you want for the query using {@link V1Beta3.Read#withNumQuerySplits}.
- *
- * <p>For example:
- *
- * <pre> {@code
- * // Read a query from Datastore
- * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- * Query query = ...;
- * String projectId = "...";
- *
- * Pipeline p = Pipeline.create(options);
- * PCollection<Entity> entities = p.apply(
- *     DatastoreIO.v1beta3().read()
- *         .withProjectId(projectId)
- *         .withQuery(query));
- * } </pre>
- *
- * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
- * many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then
- * all returned results will be read by a single Dataflow worker in order to ensure correct data.
- *
- * <p>To write a {@link PCollection} to a Datastore, use {@link V1Beta3#write},
- * specifying the Cloud Datastore project to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.v1beta3().write().withProjectId(projectId));
- * p.run();
- * } </pre>
- *
- * <p>To delete a {@link PCollection} of {@link Entity Entities} from Datastore, use
- * {@link V1Beta3#deleteEntity()}, specifying the Cloud Datastore project to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.v1beta3().deleteEntity().withProjectId(projectId));
- * p.run();
- * } </pre>
- *
- * <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Datastore,
- * use {@link V1Beta3#deleteKey}, specifying the Cloud Datastore project to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.v1beta3().deleteKey().withProjectId(projectId));
- * p.run();
- * } </pre>
- *
- * <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete
- * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
- * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
- * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
- *
- * <pre>{@code
- * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
- * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
- * }</pre>
- *
- * <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please
- * read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties,
- * and Keys</a> for more information about {@code Entity} keys.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
- * more details.
- *
- * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
- * </a>for security and permission related information specific to Datastore.
- *
- * @see org.apache.beam.sdk.runners.PipelineRunner
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class V1Beta3 {
-
-  // A package-private constructor to prevent direct instantiation from outside of this package
-  V1Beta3() {}
-
-  /**
-   * Datastore has a limit of 500 mutations per batch operation, so we flush
-   * changes to Datastore every 500 entities.
-   */
-  @VisibleForTesting
-  static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
-
-  /**
-   * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId},
-   * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
-   * {@link V1Beta3.Read#withProjectId}, {@link V1Beta3.Read#withQuery},
-   * {@link V1Beta3.Read#withNamespace}, {@link V1Beta3.Read#withNumQuerySplits}.
-   */
-  public V1Beta3.Read read() {
-    return new V1Beta3.Read(null, null, null, 0);
-  }
-
-  /**
-   * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity}
-   * objects.
-   *
-   * @see DatastoreIO
-   */
-  public static class Read extends PTransform<PBegin, PCollection<Entity>> {
-    private static final Logger LOG = LoggerFactory.getLogger(Read.class);
-
-    /** An upper bound on the number of splits for a query. */
-    public static final int NUM_QUERY_SPLITS_MAX = 50000;
-
-    /** A lower bound on the number of splits for a query. */
-    static final int NUM_QUERY_SPLITS_MIN = 12;
-
-    /** Default bundle size of 64MB. */
-    static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024;
-
-    /**
-     * Maximum number of results to request per query.
-     *
-     * <p>Must be set, or it may result in an I/O error when querying Cloud Datastore.
-     */
-    static final int QUERY_BATCH_LIMIT = 500;
-
-    @Nullable
-    private final String projectId;
-
-    @Nullable
-    private final Query query;
-
-    @Nullable
-    private final String namespace;
-
-    private final int numQuerySplits;
-
-    /**
-     * Computes the number of splits to be performed on the given query by querying the estimated
-     * size from Datastore.
-     */
-    static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
-      int numSplits;
-      try {
-        long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
-        numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX,
-            Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES));
-      } catch (Exception e) {
-        LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", query, e);
-        // Fallback in case estimated size is unavailable.
-        numSplits = NUM_QUERY_SPLITS_MIN;
-      }
-      return Math.max(numSplits, NUM_QUERY_SPLITS_MIN);
-    }
-
-    /**
-     * Get the estimated size of the data returned by the given query.
-     *
-     * <p>Datastore provides no way to get a good estimate of how large the result of a query
-     * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
-     * is specified in the query.
-     *
-     * <p>See https://cloud.google.com/datastore/docs/concepts/stats.
-     */
-    static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
-        throws DatastoreException {
-      String ourKind = query.getKind(0).getName();
-      Query.Builder queryBuilder = Query.newBuilder();
-      if (namespace == null) {
-        queryBuilder.addKindBuilder().setName("__Stat_Kind__");
-      } else {
-        queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__");
-      }
-      queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build()));
-
-      // Get the latest statistics
-      queryBuilder.addOrder(makeOrder("timestamp", DESCENDING));
-      queryBuilder.setLimit(Int32Value.newBuilder().setValue(1));
-
-      RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
-
-      long now = System.currentTimeMillis();
-      RunQueryResponse response = datastore.runQuery(request);
-      LOG.debug("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
-
-      QueryResultBatch batch = response.getBatch();
-      if (batch.getEntityResultsCount() == 0) {
-        throw new NoSuchElementException(
-            "Datastore statistics for kind " + ourKind + " unavailable");
-      }
-      Entity entity = batch.getEntityResults(0).getEntity();
-      return entity.getProperties().get("entity_bytes").getIntegerValue();
-    }
-
-    /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */
-    static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
-      RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
-      if (namespace != null) {
-        requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-      }
-      return requestBuilder.build();
-    }
-
-    /**
-     * A helper function to get the split queries, taking into account the optional
-     * {@code namespace}.
-     */
-    private static List<Query> splitQuery(Query query, @Nullable String namespace,
-        Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException {
-      // If namespace is set, include it in the split request so splits are calculated accordingly.
-      PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
-      if (namespace != null) {
-        partitionBuilder.setNamespaceId(namespace);
-      }
-
-      return querySplitter.getSplits(query, partitionBuilder.build(), numSplits, datastore);
-    }
-
-    /**
-     * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
-     * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
-     * an error will be thrown.
-     */
-    private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace,
-        int numQuerySplits) {
-      this.projectId = projectId;
-      this.query = query;
-      this.namespace = namespace;
-      this.numQuerySplits = numQuerySplits;
-    }
-
-    /**
-     * Returns a new {@link V1Beta3.Read} that reads from the Datastore for the specified project.
-     */
-    public V1Beta3.Read withProjectId(String projectId) {
-      checkNotNull(projectId, "projectId");
-      return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
-    }
-
-    /**
-     * Returns a new {@link V1Beta3.Read} that reads the results of the specified query.
-     *
-     * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
-     * across many workers. However, when the {@link Query} is configured with a limit using
-     * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order
-     * to ensure correct results.
-     */
-    public V1Beta3.Read withQuery(Query query) {
-      checkNotNull(query, "query");
-      checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
-          "Invalid query limit %s: must be positive", query.getLimit().getValue());
-      return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
-    }
-
-    /**
-     * Returns a new {@link V1Beta3.Read} that reads from the given namespace.
-     */
-    public V1Beta3.Read withNamespace(String namespace) {
-      return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
-    }
-
-    /**
-     * Returns a new {@link V1Beta3.Read} that reads by splitting the given {@code query} into
-     * {@code numQuerySplits}.
-     *
-     * <p>The semantics for the query splitting is defined below:
-     * <ul>
-     *   <li>Any value less than or equal to 0 will be ignored, and the number of splits will be
-     *   chosen dynamically at runtime based on the query data size.
-     *   <li>Any value greater than {@link Read#NUM_QUERY_SPLITS_MAX} will be capped at
-     *   {@code NUM_QUERY_SPLITS_MAX}.
-     *   <li>If the {@code query} has a user limit set, then {@code numQuerySplits} will be
-     *   ignored and no split will be performed.
-     *   <li>Under certain cases Cloud Datastore is unable to split query to the requested number of
-     *   splits. In such cases we just use whatever the Datastore returns.
-     * </ul>
-     */
-    public V1Beta3.Read withNumQuerySplits(int numQuerySplits) {
-      return new V1Beta3.Read(projectId, query, namespace,
-          Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX));
-    }
-
-    @Nullable
-    public Query getQuery() {
-      return query;
-    }
-
-    @Nullable
-    public String getProjectId() {
-      return projectId;
-    }
-
-    @Nullable
-    public String getNamespace() {
-      return namespace;
-    }
-
-
-    /**
-     * {@inheritDoc}
-     */
-    @Override
-    public PCollection<Entity> apply(PBegin input) {
-      V1Beta3Options v1Beta3Options = V1Beta3Options.from(getProjectId(), getQuery(),
-          getNamespace());
-
-      /*
-       * This composite transform involves the following steps:
-       *   1. Create a singleton of the user provided {@code query} and apply a {@link ParDo} that
-       *   splits the query into {@code numQuerySplits} and assign each split query a unique
-       *   {@code Integer} as the key. The resulting output is of the type
-       *   {@code PCollection<KV<Integer, Query>>}.
-       *
-       *   If the value of {@code numQuerySplits} is less than or equal to 0, then the number of
-       *   splits will be computed dynamically based on the size of the data for the {@code query}.
-       *
-       *   2. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The
-       *   queries are extracted from they {@code KV<Integer, Iterable<Query>>} and flattened to
-       *   output a {@code PCollection<Query>}.
-       *
-       *   3. In the third step, a {@code ParDo} reads entities for each query and outputs
-       *   a {@code PCollection<Entity>}.
-       */
-      PCollection<KV<Integer, Query>> queries = input
-          .apply(Create.of(query))
-          .apply(ParDo.of(new SplitQueryFn(v1Beta3Options, numQuerySplits)));
-
-      PCollection<Query> shardedQueries = queries
-          .apply(GroupByKey.<Integer, Query>create())
-          .apply(Values.<Iterable<Query>>create())
-          .apply(Flatten.<Query>iterables());
-
-      PCollection<Entity> entities = shardedQueries
-          .apply(ParDo.of(new ReadFn(v1Beta3Options)));
-
-      return entities;
-    }
-
-    @Override
-    public void validate(PBegin input) {
-      checkNotNull(projectId, "projectId");
-      checkNotNull(query, "query");
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("ProjectId"))
-          .addIfNotNull(DisplayData.item("namespace", namespace)
-              .withLabel("Namespace"))
-          .addIfNotNull(DisplayData.item("query", query.toString())
-              .withLabel("Query"));
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("projectId", projectId)
-          .add("query", query)
-          .add("namespace", namespace)
-          .toString();
-    }
-
-    /**
-     * A class for v1beta3 Datastore related options.
-     */
-    @VisibleForTesting
-    static class V1Beta3Options implements Serializable {
-      private final Query query;
-      private final String projectId;
-      @Nullable
-      private final String namespace;
-
-      private V1Beta3Options(String projectId, Query query, @Nullable String namespace) {
-        this.projectId = checkNotNull(projectId, "projectId");
-        this.query = checkNotNull(query, "query");
-        this.namespace = namespace;
-      }
-
-      public static V1Beta3Options from(String projectId, Query query, @Nullable String namespace) {
-        return new V1Beta3Options(projectId, query, namespace);
-      }
-
-      public Query getQuery() {
-        return query;
-      }
-
-      public String getProjectId() {
-        return projectId;
-      }
-
-      @Nullable
-      public String getNamespace() {
-        return namespace;
-      }
-    }
-
-    /**
-     * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique
-     * keys and outputs them as {@link KV}.
-     */
-    @VisibleForTesting
-    static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
-      private final V1Beta3Options options;
-      // number of splits to make for a given query
-      private final int numSplits;
-
-      private final V1Beta3DatastoreFactory datastoreFactory;
-      // Datastore client
-      private transient Datastore datastore;
-      // Query splitter
-      private transient QuerySplitter querySplitter;
-
-      public SplitQueryFn(V1Beta3Options options, int numSplits) {
-        this(options, numSplits, new V1Beta3DatastoreFactory());
-      }
-
-      @VisibleForTesting
-      SplitQueryFn(V1Beta3Options options, int numSplits,
-          V1Beta3DatastoreFactory datastoreFactory) {
-        this.options = options;
-        this.numSplits = numSplits;
-        this.datastoreFactory = datastoreFactory;
-      }
-
-      @StartBundle
-      public void startBundle(Context c) throws Exception {
-        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
-        querySplitter = datastoreFactory.getQuerySplitter();
-      }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        int key = 1;
-        Query query = c.element();
-
-        // If query has a user set limit, then do not split.
-        if (query.hasLimit()) {
-          c.output(KV.of(key, query));
-          return;
-        }
-
-        int estimatedNumSplits;
-        // Compute the estimated numSplits if numSplits is not specified by the user.
-        if (numSplits <= 0) {
-          estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace());
-        } else {
-          estimatedNumSplits = numSplits;
-        }
-
-        List<Query> querySplits;
-        try {
-          querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter,
-              estimatedNumSplits);
-        } catch (Exception e) {
-          LOG.warn("Unable to parallelize the given query: {}", query, e);
-          querySplits = ImmutableList.of(query);
-        }
-
-        // assign unique keys to query splits.
-        for (Query subquery : querySplits) {
-          c.output(KV.of(key++, subquery));
-        }
-      }
-
-      @Override
-      public void populateDisplayData(Builder builder) {
-        super.populateDisplayData(builder);
-        builder
-            .addIfNotNull(DisplayData.item("projectId", options.getProjectId())
-                .withLabel("ProjectId"))
-            .addIfNotNull(DisplayData.item("namespace", options.getNamespace())
-                .withLabel("Namespace"))
-            .addIfNotNull(DisplayData.item("query", options.getQuery().toString())
-                .withLabel("Query"));
-      }
-    }
-
-    /**
-     * A {@link DoFn} that reads entities from Datastore for each query.
-     */
-    @VisibleForTesting
-    static class ReadFn extends DoFn<Query, Entity> {
-      private final V1Beta3Options options;
-      private final V1Beta3DatastoreFactory datastoreFactory;
-      // Datastore client
-      private transient Datastore datastore;
-
-      public ReadFn(V1Beta3Options options) {
-        this(options, new V1Beta3DatastoreFactory());
-      }
-
-      @VisibleForTesting
-      ReadFn(V1Beta3Options options, V1Beta3DatastoreFactory datastoreFactory) {
-        this.options = options;
-        this.datastoreFactory = datastoreFactory;
-      }
-
-      @StartBundle
-      public void startBundle(Context c) throws Exception {
-        datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
-      }
-
-      /** Read and output entities for the given query. */
-      @ProcessElement
-      public void processElement(ProcessContext context) throws Exception {
-        Query query = context.element();
-        String namespace = options.getNamespace();
-        int userLimit = query.hasLimit()
-            ? query.getLimit().getValue() : Integer.MAX_VALUE;
-
-        boolean moreResults = true;
-        QueryResultBatch currentBatch = null;
-
-        while (moreResults) {
-          Query.Builder queryBuilder = query.toBuilder().clone();
-          queryBuilder.setLimit(Int32Value.newBuilder().setValue(
-              Math.min(userLimit, QUERY_BATCH_LIMIT)));
-
-          if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
-            queryBuilder.setStartCursor(currentBatch.getEndCursor());
-          }
-
-          RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
-          RunQueryResponse response = datastore.runQuery(request);
-
-          currentBatch = response.getBatch();
-
-          // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
-          // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
-          // use result count to determine if more results might exist.
-          int numFetch = currentBatch.getEntityResultsCount();
-          if (query.hasLimit()) {
-            verify(userLimit >= numFetch,
-                "Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit",
-                userLimit, numFetch, query.getLimit());
-            userLimit -= numFetch;
-          }
-
-          // output all the entities from the current batch.
-          for (EntityResult entityResult : currentBatch.getEntityResultsList()) {
-            context.output(entityResult.getEntity());
-          }
-
-          // Check if we have more entities to be read.
-          moreResults =
-              // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied
-              (userLimit > 0)
-                  // All indications from the API are that there are/may be more results.
-                  && ((numFetch == QUERY_BATCH_LIMIT)
-                  || (currentBatch.getMoreResults() == NOT_FINISHED));
-        }
-      }
-    }
-  }
-
-  /**
-   * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
-   * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
-   */
-  public Write write() {
-    return new Write(null);
-  }
-
-  /**
-   * Returns an empty {@link DeleteEntity} builder. Configure the destination
-   * {@code projectId} using {@link DeleteEntity#withProjectId}.
-   */
-  public DeleteEntity deleteEntity() {
-    return new DeleteEntity(null);
-  }
-
-  /**
-   * Returns an empty {@link DeleteKey} builder. Configure the destination
-   * {@code projectId} using {@link DeleteKey#withProjectId}.
-   */
-  public DeleteKey deleteKey() {
-    return new DeleteKey(null);
-  }
-
-  /**
-   * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
-   *
-   * @see DatastoreIO
-   */
-  public static class Write extends Mutate<Entity> {
-    /**
-     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
-     * it is {@code null} at instantiation time, an error will be thrown.
-     */
-    Write(@Nullable String projectId) {
-      super(projectId, new UpsertFn());
-    }
-
-    /**
-     * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project.
-     */
-    public Write withProjectId(String projectId) {
-      checkNotNull(projectId, "projectId");
-      return new Write(projectId);
-    }
-  }
-
-  /**
-   * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore.
-   *
-   * @see DatastoreIO
-   */
-  public static class DeleteEntity extends Mutate<Entity> {
-    /**
-     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
-     * it is {@code null} at instantiation time, an error will be thrown.
-     */
-    DeleteEntity(@Nullable String projectId) {
-      super(projectId, new DeleteEntityFn());
-    }
-
-    /**
-     * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the
-     * specified project.
-     */
-    public DeleteEntity withProjectId(String projectId) {
-      checkNotNull(projectId, "projectId");
-      return new DeleteEntity(projectId);
-    }
-  }
-
-  /**
-   * A {@link PTransform} that deletes {@link Entity Entities} associated with the given
-   * {@link Key Keys} from Cloud Datastore.
-   *
-   * @see DatastoreIO
-   */
-  public static class DeleteKey extends Mutate<Key> {
-    /**
-     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
-     * it is {@code null} at instantiation time, an error will be thrown.
-     */
-    DeleteKey(@Nullable String projectId) {
-      super(projectId, new DeleteKeyFn());
-    }
-
-    /**
-     * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the
-     * specified project.
-     */
-    public DeleteKey withProjectId(String projectId) {
-      checkNotNull(projectId, "projectId");
-      return new DeleteKey(projectId);
-    }
-  }
-
-  /**
-   * A {@link PTransform} that writes mutations to Cloud Datastore.
-   *
-   * <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}.
-   * {@code T} is usually either an {@link Entity} or a {@link Key}
-   * <b>Note:</b> Only idempotent Cloud Datastore mutation operations (upsert and delete) should
-   * be used by the {@code DoFn} provided, as the commits are retried when failures occur.
-   */
-  private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
-    @Nullable
-    private final String projectId;
-    /** A function that transforms each {@code T} into a mutation. */
-    private final SimpleFunction<T, Mutation> mutationFn;
-
-    /**
-     * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
-     * it is {@code null} at instantiation time, an error will be thrown.
-     */
-    Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) {
-      this.projectId = projectId;
-      this.mutationFn = checkNotNull(mutationFn);
-    }
-
-    @Override
-    public PDone apply(PCollection<T> input) {
-      input.apply("Convert to Mutation", MapElements.via(mutationFn))
-          .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
-
-      return PDone.in(input.getPipeline());
-    }
-
-    @Override
-    public void validate(PCollection<T> input) {
-      checkNotNull(projectId, "projectId");
-      checkNotNull(mutationFn, "mutationFn");
-    }
-
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("projectId", projectId)
-          .add("mutationFn", mutationFn.getClass().getName())
-          .toString();
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("Output Project"))
-          .include(mutationFn);
-    }
-
-    public String getProjectId() {
-      return projectId;
-    }
-  }
-
-  /**
-   * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in
-   * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.
-   *
-   * <p>See <a
-   * href="https://cloud.google.com/datastore/docs/concepts/entities">
-   * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations.
-   *
-   * <p>Commits are non-transactional.  If a commit fails because of a conflict over an entity
-   * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
-   * times). This means that the mutation operation should be idempotent. Thus, the writer should
-   * only be used for {code upsert} and {@code delete} mutation operations, as these are the only
-   * two Cloud Datastore mutations that are idempotent.
-   */
-  @VisibleForTesting
-  static class DatastoreWriterFn extends DoFn<Mutation, Void> {
-    private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
-    private final String projectId;
-    private transient Datastore datastore;
-    private final V1Beta3DatastoreFactory datastoreFactory;
-    // Current batch of mutations to be written.
-    private final List<Mutation> mutations = new ArrayList<>();
-    /**
-     * Since a bundle is written in batches, we should retry the commit of a batch in order to
-     * prevent transient errors from causing the bundle to fail.
-     */
-    private static final int MAX_RETRIES = 5;
-
-    /**
-     * Initial backoff time for exponential backoff for retry attempts.
-     */
-    private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
-    DatastoreWriterFn(String projectId) {
-      this(projectId, new V1Beta3DatastoreFactory());
-    }
-
-    @VisibleForTesting
-    DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) {
-      this.projectId = checkNotNull(projectId, "projectId");
-      this.datastoreFactory = datastoreFactory;
-    }
-
-    @StartBundle
-    public void startBundle(Context c) {
-      datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      mutations.add(c.element());
-      if (mutations.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
-        flushBatch();
-      }
-    }
-
-    @FinishBundle
-    public void finishBundle(Context c) throws Exception {
-      if (mutations.size() > 0) {
-        flushBatch();
-      }
-    }
-
-    /**
-     * Writes a batch of mutations to Cloud Datastore.
-     *
-     * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
-     * times). All mutations in the batch will be committed again, even if the commit was partially
-     * successful. If the retry limit is exceeded, the last exception from the Datastore will be
-     * thrown.
-     *
-     * @throws DatastoreException if the commit fails or IOException or InterruptedException if
-     * backing off between retries fails.
-     */
-    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
-      LOG.debug("Writing batch of {} mutations", mutations.size());
-      Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
-      while (true) {
-        // Batch upsert entities.
-        try {
-          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          commitRequest.addAllMutations(mutations);
-          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-          datastore.commit(commitRequest.build());
-          // Break if the commit threw no exception.
-          break;
-        } catch (DatastoreException exception) {
-          // Only log the code and message for potentially-transient errors. The entire exception
-          // will be propagated upon the last retry.
-          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
-              exception.getMessage());
-          if (!BackOffUtils.next(sleeper, backoff)) {
-            LOG.error("Aborting after {} retries.", MAX_RETRIES);
-            throw exception;
-          }
-        }
-      }
-      LOG.debug("Successfully wrote {} mutations", mutations.size());
-      mutations.clear();
-    }
-
-    @Override
-    public void populateDisplayData(Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .addIfNotNull(DisplayData.item("projectId", projectId)
-              .withLabel("Output Project"));
-    }
-  }
-
-  /**
-   * Returns true if a Datastore key is complete. A key is complete if its last element
-   * has either an id or a name.
-   */
-  static boolean isValidKey(Key key) {
-    List<PathElement> elementList = key.getPathList();
-    if (elementList.isEmpty()) {
-      return false;
-    }
-    PathElement lastElement = elementList.get(elementList.size() - 1);
-    return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
-  }
-
-  /**
-   * A function that constructs an upsert {@link Mutation} from an {@link Entity}.
-   */
-  @VisibleForTesting
-  static class UpsertFn extends SimpleFunction<Entity, Mutation> {
-    @Override
-    public Mutation apply(Entity entity) {
-      // Verify that the entity to write has a complete key.
-      checkArgument(isValidKey(entity.getKey()),
-          "Entities to be written to the Datastore must have complete keys:\n%s", entity);
-
-      return makeUpsert(entity).build();
-    }
-
-    @Override
-    public void populateDisplayData(Builder builder) {
-      builder.add(DisplayData.item("upsertFn", this.getClass())
-          .withLabel("Create Upsert Mutation"));
-    }
-  }
-
-  /**
-   * A function that constructs a delete {@link Mutation} from an {@link Entity}.
-   */
-  @VisibleForTesting
-  static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> {
-    @Override
-    public Mutation apply(Entity entity) {
-      // Verify that the entity to delete has a complete key.
-      checkArgument(isValidKey(entity.getKey()),
-          "Entities to be deleted from the Datastore must have complete keys:\n%s", entity);
-
-      return makeDelete(entity.getKey()).build();
-    }
-
-    @Override
-    public void populateDisplayData(Builder builder) {
-      builder.add(DisplayData.item("deleteEntityFn", this.getClass())
-          .withLabel("Create Delete Mutation"));
-    }
-  }
-
-  /**
-   * A function that constructs a delete {@link Mutation} from a {@link Key}.
-   */
-  @VisibleForTesting
-  static class DeleteKeyFn extends SimpleFunction<Key, Mutation> {
-    @Override
-    public Mutation apply(Key key) {
-      // Verify that the entity to delete has a complete key.
-      checkArgument(isValidKey(key),
-          "Keys to be deleted from the Datastore must be complete:\n%s", key);
-
-      return makeDelete(key).build();
-    }
-
-    @Override
-    public void populateDisplayData(Builder builder) {
-      builder.add(DisplayData.item("deleteKeyFn", this.getClass())
-          .withLabel("Create Delete Mutation"));
-    }
-  }
-
-  /**
-   * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and
-   * {@link QuerySplitter}
-   *
-   * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence
-   * wrapping them under this class, which implements {@link Serializable}.
-   */
-  @VisibleForTesting
-  static class V1Beta3DatastoreFactory implements Serializable {
-
-    /** Builds a Datastore client for the given pipeline options and project. */
-    public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
-      DatastoreOptions.Builder builder =
-          new DatastoreOptions.Builder()
-              .projectId(projectId)
-              .initializer(
-                  new RetryHttpRequestInitializer()
-              );
-
-      Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
-      if (credential != null) {
-        builder.credential(credential);
-      }
-
-      return DatastoreFactory.get().create(builder.build());
-    }
-
-    /** Builds a Datastore {@link QuerySplitter}. */
-    public QuerySplitter getQuerySplitter() {
-      return DatastoreHelper.getQuerySplitter();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
new file mode 100644
index 0000000..31b5da4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.makeRequest;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.isValidKey;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DatastoreWriterFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntity;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntityFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteKey;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteKeyFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.ReadFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.UpsertFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.V1DatastoreFactory;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Write;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Mutation;
+import com.google.datastore.v1.PartitionId;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.RunQueryRequest;
+import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Tests for {@link DatastoreV1}.
+ */
+@RunWith(JUnit4.class)
+public class DatastoreV1Test {
+  private static final String PROJECT_ID = "testProject";
+  private static final String NAMESPACE = "testNamespace";
+  private static final String KIND = "testKind";
+  private static final Query QUERY;
+  private static final V1Options V_1_OPTIONS;
+  static {
+    Query.Builder q = Query.newBuilder();
+    q.addKindBuilder().setName(KIND);
+    QUERY = q.build();
+    V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE);
+  }
+  private DatastoreV1.Read initialRead;
+
+  @Mock
+  Datastore mockDatastore;
+  @Mock
+  QuerySplitter mockQuerySplitter;
+  @Mock
+  V1DatastoreFactory mockDatastoreFactory;
+
+  @Rule
+  public final ExpectedException thrown = ExpectedException.none();
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+
+    initialRead = DatastoreIO.v1().read()
+        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+
+    when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class)))
+        .thenReturn(mockDatastore);
+    when(mockDatastoreFactory.getQuerySplitter())
+        .thenReturn(mockQuerySplitter);
+  }
+
+  @Test
+  public void testBuildRead() throws Exception {
+    DatastoreV1.Read read = DatastoreIO.v1().read()
+        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+    assertEquals(QUERY, read.getQuery());
+    assertEquals(PROJECT_ID, read.getProjectId());
+    assertEquals(NAMESPACE, read.getNamespace());
+  }
+
+  /**
+   * {@link #testBuildRead} but constructed in a different order.
+   */
+  @Test
+  public void testBuildReadAlt() throws Exception {
+    DatastoreV1.Read read =  DatastoreIO.v1().read()
+        .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
+    assertEquals(QUERY, read.getQuery());
+    assertEquals(PROJECT_ID, read.getProjectId());
+    assertEquals(NAMESPACE, read.getNamespace());
+  }
+
+  @Test
+  public void testReadValidationFailsProject() throws Exception {
+    DatastoreV1.Read read =  DatastoreIO.v1().read().withQuery(QUERY);
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("project");
+    read.validate(null);
+  }
+
+  @Test
+  public void testReadValidationFailsQuery() throws Exception {
+    DatastoreV1.Read read =  DatastoreIO.v1().read().withProjectId(PROJECT_ID);
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("query");
+    read.validate(null);
+  }
+
+  @Test
+  public void testReadValidationFailsQueryLimitZero() throws Exception {
+    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid query limit 0: must be positive");
+
+    DatastoreIO.v1().read().withQuery(invalidLimit);
+  }
+
+  @Test
+  public void testReadValidationFailsQueryLimitNegative() throws Exception {
+    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Invalid query limit -5: must be positive");
+
+    DatastoreIO.v1().read().withQuery(invalidLimit);
+  }
+
+  @Test
+  public void testReadValidationSucceedsNamespace() throws Exception {
+    DatastoreV1.Read read =  DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY);
+    /* Should succeed, as a null namespace is fine. */
+    read.validate(null);
+  }
+
+  @Test
+  public void testReadDisplayData() {
+    DatastoreV1.Read read =  DatastoreIO.v1().read()
+        .withProjectId(PROJECT_ID)
+        .withQuery(QUERY)
+        .withNamespace(NAMESPACE);
+
+    DisplayData displayData = DisplayData.from(read);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+    assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
+    assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testSourcePrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1().read().withProjectId(
+        "myProject").withQuery(Query.newBuilder().build());
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+    assertThat("DatastoreIO read should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+  }
+
+  @Test
+  public void testWriteDoesNotAllowNullProject() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    DatastoreIO.v1().write().withProjectId(null);
+  }
+
+  @Test
+  public void testWriteValidationFailsWithNoProject() throws Exception {
+    Write write =  DatastoreIO.v1().write();
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    write.validate(null);
+  }
+
+  @Test
+  public void testWriteValidationSucceedsWithProject() throws Exception {
+    Write write =  DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+    write.validate(null);
+  }
+
+  @Test
+  public void testWriteDisplayData() {
+    Write write =  DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+
+    DisplayData displayData = DisplayData.from(write);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+  }
+
+  @Test
+  public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    DatastoreIO.v1().deleteEntity().withProjectId(null);
+  }
+
+  @Test
+  public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
+    DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity();
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    deleteEntity.validate(null);
+  }
+
+  @Test
+  public void testDeleteEntityValidationSucceedsWithProject() throws Exception {
+    DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
+    deleteEntity.validate(null);
+  }
+
+  @Test
+  public void testDeleteEntityDisplayData() {
+    DeleteEntity deleteEntity =  DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
+
+    DisplayData displayData = DisplayData.from(deleteEntity);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+  }
+
+  @Test
+  public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    DatastoreIO.v1().deleteKey().withProjectId(null);
+  }
+
+  @Test
+  public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
+    DeleteKey deleteKey = DatastoreIO.v1().deleteKey();
+
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("projectId");
+
+    deleteKey.validate(null);
+  }
+
+  @Test
+  public void testDeleteKeyValidationSucceedsWithProject() throws Exception {
+    DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
+    deleteKey.validate(null);
+  }
+
+  @Test
+  public void testDeleteKeyDisplayData() {
+    DeleteKey deleteKey =  DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
+
+    DisplayData displayData = DisplayData.from(deleteKey);
+
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testWritePrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PCollection<Entity>, ?> write =
+        DatastoreIO.v1().write().withProjectId("myProject");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("DatastoreIO write should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+    assertThat("DatastoreIO write should include the upsertFn in its primitive display data",
+        displayData, hasItem(hasDisplayItem("upsertFn")));
+
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDeleteEntityPrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PCollection<Entity>, ?> write =
+        DatastoreIO.v1().deleteEntity().withProjectId("myProject");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("DatastoreIO write should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+    assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data",
+        displayData, hasItem(hasDisplayItem("deleteEntityFn")));
+
+  }
+
+  @Test
+  @Category(RunnableOnService.class)
+  public void testDeleteKeyPrimitiveDisplayData() {
+    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+    PTransform<PCollection<Key>, ?> write =
+        DatastoreIO.v1().deleteKey().withProjectId("myProject");
+
+    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+    assertThat("DatastoreIO write should include the project in its primitive display data",
+        displayData, hasItem(hasDisplayItem("projectId")));
+    assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data",
+        displayData, hasItem(hasDisplayItem("deleteKeyFn")));
+
+  }
+
+  /**
+   * Test building a Write using builder methods.
+   */
+  @Test
+  public void testBuildWrite() throws Exception {
+    DatastoreV1.Write write =  DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+    assertEquals(PROJECT_ID, write.getProjectId());
+  }
+
+  /**
+   * Test the detection of complete and incomplete keys.
+   */
+  @Test
+  public void testHasNameOrId() {
+    Key key;
+    // Complete with name, no ancestor
+    key = makeKey("bird", "finch").build();
+    assertTrue(isValidKey(key));
+
+    // Complete with id, no ancestor
+    key = makeKey("bird", 123).build();
+    assertTrue(isValidKey(key));
+
+    // Incomplete, no ancestor
+    key = makeKey("bird").build();
+    assertFalse(isValidKey(key));
+
+    // Complete with name and ancestor
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird", "horned").build();
+    assertTrue(isValidKey(key));
+
+    // Complete with id and ancestor
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird", 123).build();
+    assertTrue(isValidKey(key));
+
+    // Incomplete with ancestor
+    key = makeKey("bird", "owl").build();
+    key = makeKey(key, "bird").build();
+    assertFalse(isValidKey(key));
+
+    key = makeKey().build();
+    assertFalse(isValidKey(key));
+  }
+
+  /**
+   * Test that entities with incomplete keys cannot be updated.
+   */
+  @Test
+  public void testAddEntitiesWithIncompleteKeys() throws Exception {
+    Key key = makeKey("bird").build();
+    Entity entity = Entity.newBuilder().setKey(key).build();
+    UpsertFn upsertFn = new UpsertFn();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
+
+    upsertFn.apply(entity);
+  }
+
+  @Test
+  /**
+   * Test that entities with valid keys are transformed to upsert mutations.
+   */
+  public void testAddEntities() throws Exception {
+    Key key = makeKey("bird", "finch").build();
+    Entity entity = Entity.newBuilder().setKey(key).build();
+    UpsertFn upsertFn = new UpsertFn();
+
+    Mutation exceptedMutation = makeUpsert(entity).build();
+    assertEquals(upsertFn.apply(entity), exceptedMutation);
+  }
+
+  /**
+   * Test that entities with incomplete keys cannot be deleted.
+   */
+  @Test
+  public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
+    Key key = makeKey("bird").build();
+    Entity entity = Entity.newBuilder().setKey(key).build();
+    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys");
+
+    deleteEntityFn.apply(entity);
+  }
+
+  /**
+   * Test that entities with valid keys are transformed to delete mutations.
+   */
+  @Test
+  public void testDeleteEntities() throws Exception {
+    Key key = makeKey("bird", "finch").build();
+    Entity entity = Entity.newBuilder().setKey(key).build();
+    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+
+    Mutation exceptedMutation = makeDelete(entity.getKey()).build();
+    assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
+  }
+
+  /**
+   * Test that incomplete keys cannot be deleted.
+   */
+  @Test
+  public void testDeleteIncompleteKeys() throws Exception {
+    Key key = makeKey("bird").build();
+    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("Keys to be deleted from the Datastore must be complete");
+
+    deleteKeyFn.apply(key);
+  }
+
+  /**
+   * Test that valid keys are transformed to delete mutations.
+   */
+  @Test
+  public void testDeleteKeys() throws Exception {
+    Key key = makeKey("bird", "finch").build();
+    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+
+    Mutation exceptedMutation = makeDelete(key).build();
+    assertEquals(deleteKeyFn.apply(key), exceptedMutation);
+  }
+
+  @Test
+  public void testDatastoreWriteFnDisplayData() {
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
+    DisplayData displayData = DisplayData.from(datastoreWriter);
+    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+  }
+
+  /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
+  @Test
+  public void testDatatoreWriterFnWithOneBatch() throws Exception {
+    datastoreWriterFnTest(100);
+  }
+
+  /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
+  @Test
+  public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
+  }
+
+  /**
+   * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
+   * write batch size.
+   */
+  @Test
+  public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
+    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
+  }
+
+  // A helper method to test DatastoreWriterFn for various batch sizes.
+  private void datastoreWriterFnTest(int numMutations) throws Exception {
+    // Create the requested number of mutations.
+    List<Mutation> mutations = new ArrayList<>(numMutations);
+    for (int i = 0; i < numMutations; ++i) {
+      mutations.add(
+          makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+    }
+
+    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
+    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    doFnTester.processBundle(mutations);
+
+    int start = 0;
+    while (start < numMutations) {
+      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
+      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+      commitRequest.addAllMutations(mutations.subList(start, end));
+      // Verify all the batch requests were made with the expected mutations.
+      verify(mockDatastore, times(1)).commit(commitRequest.build());
+      start = end;
+    }
+  }
+
+  /**
+   * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
+   * query.
+   */
+  @Test
+  public void testEstimatedSizeBytes() throws Exception {
+    long entityBytes = 100L;
+    // Per Kind statistics request and response
+    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
+    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+    when(mockDatastore.runQuery(statRequest))
+        .thenReturn(statResponse);
+
+    assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
+    verify(mockDatastore, times(1)).runQuery(statRequest);
+  }
+
+  /**
+   * Tests {@link SplitQueryFn} when number of query splits is specified.
+   */
+  @Test
+  public void testSplitQueryFnWithNumSplits() throws Exception {
+    int numSplits = 100;
+    when(mockQuerySplitter.getSplits(
+        eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
+        .thenReturn(splitQuery(QUERY, numSplits));
+
+    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
+    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
+    /**
+     * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
+     * mock factory using a when clause for unit testing purposes, it is not serializable
+     * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
+     * doFn from being serialized.
+     */
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
+
+    assertEquals(queries.size(), numSplits);
+    verifyUniqueKeys(queries);
+    verify(mockQuerySplitter, times(1)).getSplits(
+        eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
+    verifyZeroInteractions(mockDatastore);
+  }
+
+  /**
+   * Tests {@link SplitQueryFn} when no query splits is specified.
+   */
+  @Test
+  public void testSplitQueryFnWithoutNumSplits() throws Exception {
+    // Force SplitQueryFn to compute the number of query splits
+    int numSplits = 0;
+    int expectedNumSplits = 20;
+    long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
+
+    // Per Kind statistics request and response
+    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
+    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+    when(mockDatastore.runQuery(statRequest))
+        .thenReturn(statResponse);
+    when(mockQuerySplitter.getSplits(
+        eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
+        .thenReturn(splitQuery(QUERY, expectedNumSplits));
+
+    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
+    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
+
+    assertEquals(queries.size(), expectedNumSplits);
+    verifyUniqueKeys(queries);
+    verify(mockQuerySplitter, times(1)).getSplits(
+        eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
+    verify(mockDatastore, times(1)).runQuery(statRequest);
+  }
+
+  /**
+   * Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit.
+   */
+  @Test
+  public void testSplitQueryFnWithQueryLimit() throws Exception {
+    Query queryWithLimit = QUERY.toBuilder().clone()
+        .setLimit(Int32Value.newBuilder().setValue(1))
+        .build();
+
+    SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory);
+    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit);
+
+    assertEquals(queries.size(), 1);
+    verifyUniqueKeys(queries);
+    verifyNoMoreInteractions(mockDatastore);
+    verifyNoMoreInteractions(mockQuerySplitter);
+  }
+
+  /** Tests {@link ReadFn} with a query limit less than one batch. */
+  @Test
+  public void testReadFnWithOneBatch() throws Exception {
+    readFnTest(5);
+  }
+
+  /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
+  @Test
+  public void testReadFnWithMultipleBatches() throws Exception {
+    readFnTest(QUERY_BATCH_LIMIT + 5);
+  }
+
+  /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
+  @Test
+  public void testReadFnWithBatchesExactMultiple() throws Exception {
+    readFnTest(5 * QUERY_BATCH_LIMIT);
+  }
+
+  /** Helper Methods */
+
+  /** A helper function that verifies if all the queries have unique keys. */
+  private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
+    Set<Integer> keys = new HashSet<>();
+    for (KV<Integer, Query> kv: queries) {
+      keys.add(kv.getKey());
+    }
+    assertEquals(keys.size(), queries.size());
+  }
+
+  /**
+   * A helper function that creates mock {@link Entity} results in response to a query. Always
+   * indicates that more results are available, unless the batch is limited to fewer than
+   * {@link DatastoreV1.Read#QUERY_BATCH_LIMIT} results.
+   */
+  private static RunQueryResponse mockResponseForQuery(Query q) {
+    // Every query DatastoreV1 sends should have a limit.
+    assertTrue(q.hasLimit());
+
+    // The limit should be in the range [1, QUERY_BATCH_LIMIT]
+    int limit = q.getLimit().getValue();
+    assertThat(limit, greaterThanOrEqualTo(1));
+    assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT));
+
+    // Create the requested number of entities.
+    List<EntityResult> entities = new ArrayList<>(limit);
+    for (int i = 0; i < limit; ++i) {
+      entities.add(
+          EntityResult.newBuilder()
+              .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
+              .build());
+    }
+
+    // Fill out the other parameters on the returned result batch.
+    RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
+    ret.getBatchBuilder()
+        .addAllEntityResults(entities)
+        .setEntityResultType(EntityResult.ResultType.FULL)
+        .setMoreResults(
+            limit == QUERY_BATCH_LIMIT
+                ? QueryResultBatch.MoreResultsType.NOT_FINISHED
+                : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
+
+    return ret.build();
+  }
+
+  /** Helper function to run a test reading from a {@link ReadFn}. */
+  private void readFnTest(int numEntities) throws Exception {
+    // An empty query to read entities.
+    Query query = Query.newBuilder().setLimit(
+        Int32Value.newBuilder().setValue(numEntities)).build();
+
+    // Use mockResponseForQuery to generate results.
+    when(mockDatastore.runQuery(any(RunQueryRequest.class)))
+        .thenAnswer(new Answer<RunQueryResponse>() {
+          @Override
+          public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
+            Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
+            return mockResponseForQuery(q);
+          }
+        });
+
+    ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
+    DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
+    /**
+     * Although Datastore client is marked transient in {@link ReadFn}, when injected through
+     * mock factory using a when clause for unit testing purposes, it is not serializable
+     * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
+     * test object from being serialized.
+     */
+    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+    List<Entity> entities = doFnTester.processBundle(query);
+
+    int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
+    verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
+    // Validate the number of results.
+    assertEquals(numEntities, entities.size());
+  }
+
+  /** Builds a per-kind statistics response with the given entity size. */
+  private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
+    RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
+    Entity.Builder entity = Entity.newBuilder();
+    entity.setKey(makeKey("dummyKind", "dummyId"));
+    entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build());
+    EntityResult.Builder entityResult = EntityResult.newBuilder();
+    entityResult.setEntity(entity);
+    QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
+    batch.addEntityResults(entityResult);
+    timestampResponse.setBatch(batch);
+    return timestampResponse.build();
+  }
+
+  /** Builds a per-kind statistics query for the given timestamp and namespace. */
+  private static Query makeStatKindQuery(String namespace) {
+    Query.Builder statQuery = Query.newBuilder();
+    if (namespace == null) {
+      statQuery.addKindBuilder().setName("__Stat_Kind__");
+    } else {
+      statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
+    }
+    statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
+    statQuery.addOrder(makeOrder("timestamp", DESCENDING));
+    statQuery.setLimit(Int32Value.newBuilder().setValue(1));
+    return statQuery.build();
+  }
+
+  /** Generate dummy query splits. */
+  private List<Query> splitQuery(Query query, int numSplits) {
+    List<Query> queries = new LinkedList<>();
+    for (int i = 0; i < numSplits; i++) {
+      queries.add(query.toBuilder().clone().build());
+    }
+    return queries;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java
deleted file mode 100644
index ddb6d81..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.deleteAllEntities;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.getDatastore;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.makeAncestorKey;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.makeEntity;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.UpsertMutationBuilder;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.V1Beta3TestWriter;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.client.Datastore;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.UUID;
-
-/**
- * End-to-end tests for Datastore V1Beta3.Read.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3ReadIT {
-  private V1Beta3TestOptions options;
-  private String ancestor;
-  private final long numEntities = 1000;
-
-  @Before
-  public void setup() {
-    PipelineOptionsFactory.register(V1Beta3TestOptions.class);
-    options = TestPipeline.testingPipelineOptions().as(V1Beta3TestOptions.class);
-    ancestor = UUID.randomUUID().toString();
-  }
-
-  /**
-   * An end-to-end test for {@link V1Beta3.Read}.
-   *
-   * Write some test entities to datastore and then run a dataflow pipeline that
-   * reads and counts the total number of entities. Verify that the count matches the
-   * number of entities written.
-   */
-  @Test
-  public void testE2EV1Beta3Read() throws Exception {
-    // Create entities and write them to datastore
-    writeEntitiesToDatastore(options, ancestor, numEntities);
-
-    // Read from datastore
-    Query query = V1Beta3TestUtil.makeAncestorKindQuery(
-        options.getKind(), options.getNamespace(), ancestor);
-
-    V1Beta3.Read read = DatastoreIO.v1beta3().read()
-        .withProjectId(options.getProject())
-        .withQuery(query)
-        .withNamespace(options.getNamespace());
-
-    // Count the total number of entities
-    Pipeline p = Pipeline.create(options);
-    PCollection<Long> count = p
-        .apply(read)
-        .apply(Count.<Entity>globally());
-
-    PAssert.thatSingleton(count).isEqualTo(numEntities);
-    p.run();
-  }
-
-  // Creates entities and write them to datastore
-  private static void writeEntitiesToDatastore(V1Beta3TestOptions options, String ancestor,
-      long numEntities) throws Exception {
-    Datastore datastore = getDatastore(options, options.getProject());
-    // Write test entities to datastore
-    V1Beta3TestWriter writer = new V1Beta3TestWriter(datastore, new UpsertMutationBuilder());
-    Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
-
-    for (long i = 0; i < numEntities; i++) {
-      Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
-      writer.write(entity);
-    }
-    writer.close();
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    deleteAllEntities(options, ancestor);
-  }
-}


[2/4] incubator-beam git commit: DatastoreIO v1beta3 to v1

Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
deleted file mode 100644
index b0c6c18..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ /dev/null
@@ -1,792 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.isValidKey;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
-import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntity;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntityFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKey;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKeyFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Tests for {@link V1Beta3}.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3Test {
-  private static final String PROJECT_ID = "testProject";
-  private static final String NAMESPACE = "testNamespace";
-  private static final String KIND = "testKind";
-  private static final Query QUERY;
-  private static final V1Beta3Options v1Beta3Options;
-  static {
-    Query.Builder q = Query.newBuilder();
-    q.addKindBuilder().setName(KIND);
-    QUERY = q.build();
-    v1Beta3Options = V1Beta3Options.from(PROJECT_ID, QUERY, NAMESPACE);
-  }
-  private V1Beta3.Read initialRead;
-
-  @Mock
-  Datastore mockDatastore;
-  @Mock
-  QuerySplitter mockQuerySplitter;
-  @Mock
-  V1Beta3DatastoreFactory mockDatastoreFactory;
-
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-
-  @Before
-  public void setUp() {
-    MockitoAnnotations.initMocks(this);
-
-    initialRead = DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-
-    when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class)))
-        .thenReturn(mockDatastore);
-    when(mockDatastoreFactory.getQuerySplitter())
-        .thenReturn(mockQuerySplitter);
-  }
-
-  @Test
-  public void testBuildRead() throws Exception {
-    V1Beta3.Read read = DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId());
-    assertEquals(NAMESPACE, read.getNamespace());
-  }
-
-  /**
-   * {@link #testBuildRead} but constructed in a different order.
-   */
-  @Test
-  public void testBuildReadAlt() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read()
-        .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId());
-    assertEquals(NAMESPACE, read.getNamespace());
-  }
-
-  @Test
-  public void testReadValidationFailsProject() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withQuery(QUERY);
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("project");
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQuery() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID);
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("query");
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0: must be positive");
-
-    DatastoreIO.v1beta3().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5: must be positive");
-
-    DatastoreIO.v1beta3().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationSucceedsNamespace() throws Exception {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY);
-    /* Should succeed, as a null namespace is fine. */
-    read.validate(null);
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    V1Beta3.Read read =  DatastoreIO.v1beta3().read()
-      .withProjectId(PROJECT_ID)
-      .withQuery(QUERY)
-      .withNamespace(NAMESPACE);
-
-    DisplayData displayData = DisplayData.from(read);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-    assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
-    assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testSourcePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
-        "myProject").withQuery(Query.newBuilder().build());
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
-    assertThat("DatastoreIO read should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-  }
-
-  @Test
-  public void testWriteDoesNotAllowNullProject() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    DatastoreIO.v1beta3().write().withProjectId(null);
-  }
-
-  @Test
-  public void testWriteValidationFailsWithNoProject() throws Exception {
-    Write write =  DatastoreIO.v1beta3().write();
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    write.validate(null);
-  }
-
-  @Test
-  public void testWriteValidationSucceedsWithProject() throws Exception {
-    Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-    write.validate(null);
-  }
-
-  @Test
-  public void testWriteDisplayData() {
-    Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-
-    DisplayData displayData = DisplayData.from(write);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  @Test
-  public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    DatastoreIO.v1beta3().deleteEntity().withProjectId(null);
-  }
-
-  @Test
-  public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
-    DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity();
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    deleteEntity.validate(null);
-  }
-
-  @Test
-  public void testDeleteEntityValidationSucceedsWithProject() throws Exception {
-    DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
-    deleteEntity.validate(null);
-  }
-
-  @Test
-  public void testDeleteEntityDisplayData() {
-    DeleteEntity deleteEntity =  DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
-
-    DisplayData displayData = DisplayData.from(deleteEntity);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  @Test
-  public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    DatastoreIO.v1beta3().deleteKey().withProjectId(null);
-  }
-
-  @Test
-  public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
-    DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey();
-
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("projectId");
-
-    deleteKey.validate(null);
-  }
-
-  @Test
-  public void testDeleteKeyValidationSucceedsWithProject() throws Exception {
-    DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
-    deleteKey.validate(null);
-  }
-
-  @Test
-  public void testDeleteKeyDisplayData() {
-    DeleteKey deleteKey =  DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
-
-    DisplayData displayData = DisplayData.from(deleteKey);
-
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testWritePrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1beta3().write().withProjectId("myProject");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-    assertThat("DatastoreIO write should include the upsertFn in its primitive display data",
-        displayData, hasItem(hasDisplayItem("upsertFn")));
-
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testDeleteEntityPrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Entity>, ?> write =
-        DatastoreIO.v1beta3().deleteEntity().withProjectId("myProject");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-    assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data",
-        displayData, hasItem(hasDisplayItem("deleteEntityFn")));
-
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testDeleteKeyPrimitiveDisplayData() {
-    DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
-    PTransform<PCollection<Key>, ?> write =
-        DatastoreIO.v1beta3().deleteKey().withProjectId("myProject");
-
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
-    assertThat("DatastoreIO write should include the project in its primitive display data",
-        displayData, hasItem(hasDisplayItem("projectId")));
-    assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data",
-        displayData, hasItem(hasDisplayItem("deleteKeyFn")));
-
-  }
-
-  /**
-   * Test building a Write using builder methods.
-   */
-  @Test
-  public void testBuildWrite() throws Exception {
-    V1Beta3.Write write =  DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-    assertEquals(PROJECT_ID, write.getProjectId());
-  }
-
-  /**
-   * Test the detection of complete and incomplete keys.
-   */
-  @Test
-  public void testHasNameOrId() {
-    Key key;
-    // Complete with name, no ancestor
-    key = makeKey("bird", "finch").build();
-    assertTrue(isValidKey(key));
-
-    // Complete with id, no ancestor
-    key = makeKey("bird", 123).build();
-    assertTrue(isValidKey(key));
-
-    // Incomplete, no ancestor
-    key = makeKey("bird").build();
-    assertFalse(isValidKey(key));
-
-    // Complete with name and ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird", "horned").build();
-    assertTrue(isValidKey(key));
-
-    // Complete with id and ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird", 123).build();
-    assertTrue(isValidKey(key));
-
-    // Incomplete with ancestor
-    key = makeKey("bird", "owl").build();
-    key = makeKey(key, "bird").build();
-    assertFalse(isValidKey(key));
-
-    key = makeKey().build();
-    assertFalse(isValidKey(key));
-  }
-
-  /**
-   * Test that entities with incomplete keys cannot be updated.
-   */
-  @Test
-  public void testAddEntitiesWithIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    UpsertFn upsertFn = new UpsertFn();
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
-
-    upsertFn.apply(entity);
-  }
-
-  @Test
-  /**
-   * Test that entities with valid keys are transformed to upsert mutations.
-   */
-  public void testAddEntities() throws Exception {
-    Key key = makeKey("bird", "finch").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    UpsertFn upsertFn = new UpsertFn();
-
-    Mutation exceptedMutation = makeUpsert(entity).build();
-    assertEquals(upsertFn.apply(entity), exceptedMutation);
-  }
-
-  /**
-   * Test that entities with incomplete keys cannot be deleted.
-   */
-  @Test
-  public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys");
-
-    deleteEntityFn.apply(entity);
-  }
-
-  /**
-   * Test that entities with valid keys are transformed to delete mutations.
-   */
-  @Test
-  public void testDeleteEntities() throws Exception {
-    Key key = makeKey("bird", "finch").build();
-    Entity entity = Entity.newBuilder().setKey(key).build();
-    DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
-
-    Mutation exceptedMutation = makeDelete(entity.getKey()).build();
-    assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
-  }
-
-  /**
-   * Test that incomplete keys cannot be deleted.
-   */
-  @Test
-  public void testDeleteIncompleteKeys() throws Exception {
-    Key key = makeKey("bird").build();
-    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Keys to be deleted from the Datastore must be complete");
-
-    deleteKeyFn.apply(key);
-  }
-
-  /**
-   * Test that valid keys are transformed to delete mutations.
-   */
-  @Test
-  public void testDeleteKeys() throws Exception {
-    Key key = makeKey("bird", "finch").build();
-    DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
-
-    Mutation exceptedMutation = makeDelete(key).build();
-    assertEquals(deleteKeyFn.apply(key), exceptedMutation);
-  }
-
-  @Test
-  public void testDatastoreWriteFnDisplayData() {
-    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
-    DisplayData displayData = DisplayData.from(datastoreWriter);
-    assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
-  }
-
-  /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
-  @Test
-  public void testDatatoreWriterFnWithOneBatch() throws Exception {
-    datastoreWriterFnTest(100);
-  }
-
-  /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
-  @Test
-  public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
-  }
-
-  /**
-   * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
-   * write batch size.
-   */
-  @Test
-  public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
-    datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
-  }
-
-  // A helper method to test DatastoreWriterFn for various batch sizes.
-  private void datastoreWriterFnTest(int numMutations) throws Exception {
-    // Create the requested number of mutations.
-    List<Mutation> mutations = new ArrayList<>(numMutations);
-    for (int i = 0; i < numMutations; ++i) {
-      mutations.add(
-          makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
-    }
-
-    DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
-    DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    doFnTester.processBundle(mutations);
-
-    int start = 0;
-    while (start < numMutations) {
-      int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
-      CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-      commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-        commitRequest.addAllMutations(mutations.subList(start, end));
-      // Verify all the batch requests were made with the expected mutations.
-      verify(mockDatastore, times(1)).commit(commitRequest.build());
-      start = end;
-    }
-  }
-
-  /**
-   * Tests {@link V1Beta3.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
-   * query.
-   */
-  @Test
-  public void testEstimatedSizeBytes() throws Exception {
-    long entityBytes = 100L;
-    // Per Kind statistics request and response
-    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
-    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
-    when(mockDatastore.runQuery(statRequest))
-        .thenReturn(statResponse);
-
-    assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
-    verify(mockDatastore, times(1)).runQuery(statRequest);
-  }
-
-  /**
-   * Tests {@link SplitQueryFn} when number of query splits is specified.
-   */
-  @Test
-  public void testSplitQueryFnWithNumSplits() throws Exception {
-    int numSplits = 100;
-    when(mockQuerySplitter.getSplits(
-        eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
-        .thenReturn(splitQuery(QUERY, numSplits));
-
-    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory);
-    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
-    /**
-     * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
-     * mock factory using a when clause for unit testing purposes, it is not serializable
-     * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
-     * doFn from being serialized.
-     */
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
-
-    assertEquals(queries.size(), numSplits);
-    verifyUniqueKeys(queries);
-    verify(mockQuerySplitter, times(1)).getSplits(
-        eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
-    verifyZeroInteractions(mockDatastore);
-  }
-
-  /**
-   * Tests {@link SplitQueryFn} when no query splits is specified.
-   */
-  @Test
-  public void testSplitQueryFnWithoutNumSplits() throws Exception {
-    // Force SplitQueryFn to compute the number of query splits
-    int numSplits = 0;
-    int expectedNumSplits = 20;
-    long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
-
-    // Per Kind statistics request and response
-    RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
-    RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
-    when(mockDatastore.runQuery(statRequest))
-        .thenReturn(statResponse);
-    when(mockQuerySplitter.getSplits(
-        eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
-        .thenReturn(splitQuery(QUERY, expectedNumSplits));
-
-    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory);
-    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
-
-    assertEquals(queries.size(), expectedNumSplits);
-    verifyUniqueKeys(queries);
-    verify(mockQuerySplitter, times(1)).getSplits(
-        eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
-    verify(mockDatastore, times(1)).runQuery(statRequest);
-  }
-
-  /**
-   * Tests {@link V1Beta3.Read.SplitQueryFn} when the query has a user specified limit.
-   */
-  @Test
-  public void testSplitQueryFnWithQueryLimit() throws Exception {
-    Query queryWithLimit = QUERY.toBuilder().clone()
-        .setLimit(Int32Value.newBuilder().setValue(1))
-        .build();
-
-    SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, 10, mockDatastoreFactory);
-    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit);
-
-    assertEquals(queries.size(), 1);
-    verifyUniqueKeys(queries);
-    verifyNoMoreInteractions(mockDatastore);
-    verifyNoMoreInteractions(mockQuerySplitter);
-  }
-
-  /** Tests {@link ReadFn} with a query limit less than one batch. */
-  @Test
-  public void testReadFnWithOneBatch() throws Exception {
-    readFnTest(5);
-  }
-
-  /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
-  @Test
-  public void testReadFnWithMultipleBatches() throws Exception {
-    readFnTest(QUERY_BATCH_LIMIT + 5);
-  }
-
-  /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
-  @Test
-  public void testReadFnWithBatchesExactMultiple() throws Exception {
-    readFnTest(5 * QUERY_BATCH_LIMIT);
-  }
-
-  /** Helper Methods */
-
-  /** A helper function that verifies if all the queries have unique keys. */
-  private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
-    Set<Integer> keys = new HashSet<>();
-    for (KV<Integer, Query> kv: queries) {
-      keys.add(kv.getKey());
-    }
-    assertEquals(keys.size(), queries.size());
-  }
-
-  /**
-   * A helper function that creates mock {@link Entity} results in response to a query. Always
-   * indicates that more results are available, unless the batch is limited to fewer than
-   * {@link V1Beta3.Read#QUERY_BATCH_LIMIT} results.
-   */
-  private static RunQueryResponse mockResponseForQuery(Query q) {
-    // Every query V1Beta3 sends should have a limit.
-    assertTrue(q.hasLimit());
-
-    // The limit should be in the range [1, QUERY_BATCH_LIMIT]
-    int limit = q.getLimit().getValue();
-    assertThat(limit, greaterThanOrEqualTo(1));
-    assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT));
-
-    // Create the requested number of entities.
-    List<EntityResult> entities = new ArrayList<>(limit);
-    for (int i = 0; i < limit; ++i) {
-      entities.add(
-          EntityResult.newBuilder()
-              .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
-              .build());
-    }
-
-    // Fill out the other parameters on the returned result batch.
-    RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
-    ret.getBatchBuilder()
-        .addAllEntityResults(entities)
-        .setEntityResultType(EntityResult.ResultType.FULL)
-        .setMoreResults(
-            limit == QUERY_BATCH_LIMIT
-                ? QueryResultBatch.MoreResultsType.NOT_FINISHED
-                : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
-
-    return ret.build();
-  }
-
-  /** Helper function to run a test reading from a {@link ReadFn}. */
-  private void readFnTest(int numEntities) throws Exception {
-    // An empty query to read entities.
-    Query query = Query.newBuilder().setLimit(
-        Int32Value.newBuilder().setValue(numEntities)).build();
-
-    // Use mockResponseForQuery to generate results.
-    when(mockDatastore.runQuery(any(RunQueryRequest.class)))
-        .thenAnswer(new Answer<RunQueryResponse>() {
-          @Override
-          public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
-            Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
-            return mockResponseForQuery(q);
-          }
-        });
-
-    ReadFn readFn = new ReadFn(v1Beta3Options, mockDatastoreFactory);
-    DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
-    /**
-     * Although Datastore client is marked transient in {@link ReadFn}, when injected through
-     * mock factory using a when clause for unit testing purposes, it is not serializable
-     * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
-     * test object from being serialized.
-     */
-    doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
-    List<Entity> entities = doFnTester.processBundle(query);
-
-    int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
-    verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
-    // Validate the number of results.
-    assertEquals(numEntities, entities.size());
-  }
-
-  /** Builds a per-kind statistics response with the given entity size. */
-  private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
-    RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
-    Entity.Builder entity = Entity.newBuilder();
-    entity.setKey(makeKey("dummyKind", "dummyId"));
-    entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build());
-    EntityResult.Builder entityResult = EntityResult.newBuilder();
-    entityResult.setEntity(entity);
-    QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
-    batch.addEntityResults(entityResult);
-    timestampResponse.setBatch(batch);
-    return timestampResponse.build();
-  }
-
-  /** Builds a per-kind statistics query for the given timestamp and namespace. */
-  private static Query makeStatKindQuery(String namespace) {
-    Query.Builder statQuery = Query.newBuilder();
-    if (namespace == null) {
-      statQuery.addKindBuilder().setName("__Stat_Kind__");
-    } else {
-      statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
-    }
-    statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
-    statQuery.addOrder(makeOrder("timestamp", DESCENDING));
-    statQuery.setLimit(Int32Value.newBuilder().setValue(1));
-    return statQuery.build();
-  }
-
-  /** Generate dummy query splits. */
-  private List<Query> splitQuery(Query query, int numSplits) {
-    List<Query> queries = new LinkedList<>();
-    for (int i = 0; i < numSplits; i++) {
-      queries.add(query.toBuilder().clone().build());
-    }
-    return queries;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
deleted file mode 100644
index 099ebe0..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-
-import javax.annotation.Nullable;
-
-/**
- * V1Beta3 Datastore related pipeline options.
- */
-public interface V1Beta3TestOptions extends TestPipelineOptions {
-  @Description("Project ID to read from datastore")
-  @Default.String("apache-beam-testing")
-  String getProject();
-  void setProject(String value);
-
-  @Description("Datastore Entity kind")
-  @Default.String("beam-test")
-  String getKind();
-  void setKind(String value);
-
-  @Description("Datastore Namespace")
-  String getNamespace();
-  void setNamespace(@Nullable String value);
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
deleted file mode 100644
index 7eaf23e..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Key.PathElement;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PropertyFilter;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreException;
-import com.google.datastore.v1beta3.client.DatastoreFactory;
-import com.google.datastore.v1beta3.client.DatastoreOptions;
-import com.google.protobuf.Int32Value;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import javax.annotation.Nullable;
-
-class V1Beta3TestUtil {
-  private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestUtil.class);
-
-  /**
-   * A helper function to create the ancestor key for all created and queried entities.
-   */
-  static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) {
-    Key.Builder keyBuilder = makeKey(kind, ancestor);
-    if (namespace != null) {
-      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-    }
-    return keyBuilder.build();
-  }
-
-  /**
-   * Build a datastore ancestor query for the specified kind, namespace and ancestor.
-   */
-  static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) {
-    Query.Builder q = Query.newBuilder();
-    q.addKindBuilder().setName(kind);
-    q.setFilter(makeFilter(
-        "__key__",
-        PropertyFilter.Operator.HAS_ANCESTOR,
-        makeValue(makeAncestorKey(namespace, kind, ancestor))));
-    return q.build();
-  }
-
-  /**
-   * Build an entity for the given ancestorKey, kind, namespace and value.
-   */
-  static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
-    Entity.Builder entityBuilder = Entity.newBuilder();
-    Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
-    // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
-    // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
-    // we can simplify this code.
-    if (namespace != null) {
-      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-    }
-
-    entityBuilder.setKey(keyBuilder.build());
-    entityBuilder.getMutableProperties().put("value", makeValue(value).build());
-    return entityBuilder.build();
-  }
-
-  /**
-   * A DoFn that creates entity for a long number.
-   */
-  static class CreateEntityFn extends DoFn<Long, Entity> {
-    private final String kind;
-    @Nullable
-    private final String namespace;
-    private Key ancestorKey;
-
-    CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
-      this.kind = kind;
-      this.namespace = namespace;
-      // Build the ancestor key for all created entities once, including the namespace.
-      ancestorKey = makeAncestorKey(namespace, kind, ancestor);
-    }
-
-    @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
-      c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
-    }
-  }
-
-  /**
-   * Build a new datastore client.
-   */
-  static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
-    DatastoreOptions.Builder builder =
-        new DatastoreOptions.Builder()
-            .projectId(projectId)
-            .initializer(
-                new RetryHttpRequestInitializer()
-            );
-
-    Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
-    if (credential != null) {
-      builder.credential(credential);
-    }
-    return DatastoreFactory.get().create(builder.build());
-  }
-
-  /**
-   * Build a datastore query request.
-   */
-  private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
-    RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
-    if (namespace != null) {
-      requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
-    }
-    return requestBuilder.build();
-  }
-
-  /**
-   * Delete all entities with the given ancestor.
-   */
-  static void deleteAllEntities(V1Beta3TestOptions options, String ancestor) throws Exception {
-    Datastore datastore = getDatastore(options, options.getProject());
-    Query query = V1Beta3TestUtil.makeAncestorKindQuery(
-        options.getKind(), options.getNamespace(), ancestor);
-
-    V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace());
-    V1Beta3TestWriter writer = new V1Beta3TestWriter(datastore, new DeleteMutationBuilder());
-
-    long numEntities = 0;
-    while (reader.advance()) {
-      Entity entity = reader.getCurrent();
-      numEntities++;
-      writer.write(entity);
-    }
-
-    writer.close();
-    LOG.info("Successfully deleted {} entities", numEntities);
-  }
-
-  /**
-   * Returns the total number of entities for the given datastore.
-   */
-  static long countEntities(V1Beta3TestOptions options, String ancestor) throws Exception {
-    // Read from datastore.
-    Datastore datastore = V1Beta3TestUtil.getDatastore(options, options.getProject());
-    Query query = V1Beta3TestUtil.makeAncestorKindQuery(
-        options.getKind(), options.getNamespace(), ancestor);
-
-    V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace());
-
-    long numEntitiesRead = 0;
-    while (reader.advance()) {
-      reader.getCurrent();
-      numEntitiesRead++;
-    }
-    return numEntitiesRead;
-  }
-
-  /**
-   * An interface to represent any datastore mutation operation.
-   * Mutation operations include insert, delete, upsert, update.
-   */
-  interface MutationBuilder {
-    Mutation.Builder apply(Entity entity);
-  }
-
-  /**
-   *A MutationBuilder that performs upsert operation.
-   */
-  static class UpsertMutationBuilder implements MutationBuilder {
-    public Mutation.Builder apply(Entity entity) {
-      return makeUpsert(entity);
-    }
-  }
-
-  /**
-   * A MutationBuilder that performs delete operation.
-   */
-  static class DeleteMutationBuilder implements MutationBuilder {
-    public Mutation.Builder apply(Entity entity) {
-      return makeDelete(entity.getKey());
-    }
-  }
-
-  /**
-   * A helper class to write entities to datastore.
-   */
-  static class V1Beta3TestWriter {
-    private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestWriter.class);
-    // Limits the number of entities updated per batch
-    private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
-    // Number of times to retry on update failure
-    private static final int MAX_RETRIES = 5;
-    //Initial backoff time for exponential backoff for retry attempts.
-    private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
-    // Returns true if a Datastore key is complete. A key is complete if its last element
-    // has either an id or a name.
-    static boolean isValidKey(Key key) {
-      List<PathElement> elementList = key.getPathList();
-      if (elementList.isEmpty()) {
-        return false;
-      }
-      PathElement lastElement = elementList.get(elementList.size() - 1);
-      return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
-    }
-
-    private final Datastore datastore;
-    private final MutationBuilder mutationBuilder;
-    private final List<Entity> entities = new ArrayList<>();
-
-    V1Beta3TestWriter(Datastore datastore, MutationBuilder mutationBuilder) {
-      this.datastore = datastore;
-      this.mutationBuilder = mutationBuilder;
-    }
-
-    void write(Entity value) throws Exception {
-      // Verify that the entity to write has a complete key.
-      if (!isValidKey(value.getKey())) {
-        throw new IllegalArgumentException(
-            "Entities to be written to the Datastore must have complete keys");
-      }
-
-      entities.add(value);
-
-      if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
-        flushBatch();
-      }
-    }
-
-    void close() throws Exception {
-      // flush any remaining entities
-      if (entities.size() > 0) {
-        flushBatch();
-      }
-    }
-
-    // commit the list of entities to datastore
-    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
-      LOG.info("Writing batch of {} entities", entities.size());
-      Sleeper sleeper = Sleeper.DEFAULT;
-      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
-      while (true) {
-        // Batch mutate entities.
-        try {
-          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
-          for (Entity entity: entities) {
-            commitRequest.addMutations(mutationBuilder.apply(entity));
-          }
-          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
-          datastore.commit(commitRequest.build());
-          // Break if the commit threw no exception.
-          break;
-        } catch (DatastoreException exception) {
-          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
-              exception.getMessage());
-          if (!BackOffUtils.next(sleeper, backoff)) {
-            LOG.error("Aborting after {} retries.", MAX_RETRIES);
-            throw exception;
-          }
-        }
-      }
-      LOG.info("Successfully wrote {} entities", entities.size());
-      entities.clear();
-    }
-  }
-
-  /**
-   * A helper class to read entities from datastore.
-   */
-  static class V1Beta3TestReader {
-    private static final int QUERY_BATCH_LIMIT = 500;
-    private final Datastore datastore;
-    private final Query query;
-    @Nullable
-    private final String namespace;
-    private boolean moreResults;
-    private java.util.Iterator<EntityResult> entities;
-    // Current batch of query results
-    private QueryResultBatch currentBatch;
-    private Entity currentEntity;
-
-    V1Beta3TestReader(Datastore datastore, Query query, @Nullable String namespace) {
-      this.datastore = datastore;
-      this.query = query;
-      this.namespace = namespace;
-    }
-
-    Entity getCurrent() {
-      return currentEntity;
-    }
-
-    boolean advance() throws IOException {
-      if (entities == null || (!entities.hasNext() && moreResults)) {
-        try {
-          entities = getIteratorAndMoveCursor();
-        } catch (DatastoreException e) {
-          throw new IOException(e);
-        }
-      }
-
-      if (entities == null || !entities.hasNext()) {
-        currentEntity = null;
-        return false;
-      }
-
-      currentEntity = entities.next().getEntity();
-      return true;
-    }
-
-    // Read the next batch of query results.
-    private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
-      Query.Builder query = this.query.toBuilder().clone();
-      query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
-      if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
-        query.setStartCursor(currentBatch.getEndCursor());
-      }
-
-      RunQueryRequest request = makeRequest(query.build(), namespace);
-      RunQueryResponse response = datastore.runQuery(request);
-
-      currentBatch = response.getBatch();
-
-      int numFetch = currentBatch.getEntityResultsCount();
-      // All indications from the API are that there are/may be more results.
-      moreResults = ((numFetch == QUERY_BATCH_LIMIT)
-              || (currentBatch.getMoreResults() == NOT_FINISHED));
-
-      // May receive a batch of 0 results if the number of records is a multiple
-      // of the request limit.
-      if (numFetch == 0) {
-        return null;
-      }
-
-      return currentBatch.getEntityResultsList().iterator();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
deleted file mode 100644
index 782065f..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.countEntities;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.deleteAllEntities;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.CreateEntityFn;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.UUID;
-
-/**
- * End-to-end tests for Datastore V1Beta3.Write.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3WriteIT {
-  private V1Beta3TestOptions options;
-  private String ancestor;
-  private final long numEntities = 1000;
-
-  @Before
-  public void setup() {
-    PipelineOptionsFactory.register(V1Beta3TestOptions.class);
-    options = TestPipeline.testingPipelineOptions().as(V1Beta3TestOptions.class);
-    ancestor = UUID.randomUUID().toString();
-  }
-
-  /**
-   * An end-to-end test for {@link V1Beta3.Write}.
-   *
-   * Write some test entities to datastore through a dataflow pipeline.
-   * Read and count all the entities. Verify that the count matches the
-   * number of entities written.
-   */
-  @Test
-  public void testE2EV1Beta3Write() throws Exception {
-    Pipeline p = Pipeline.create(options);
-
-    // Write to datastore
-    p.apply(CountingInput.upTo(numEntities))
-        .apply(ParDo.of(new CreateEntityFn(
-            options.getKind(), options.getNamespace(), ancestor)))
-        .apply(DatastoreIO.v1beta3().write().withProjectId(options.getProject()));
-
-    p.run();
-
-    // Count number of entities written to datastore.
-    long numEntitiesWritten = countEntities(options, ancestor);
-
-    assertEquals(numEntitiesWritten, numEntities);
-  }
-
-  @After
-  public void tearDown() throws Exception {
-    deleteAllEntities(options, ancestor);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
new file mode 100644
index 0000000..8fedc77
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.getDatastore;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeAncestorKey;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeEntity;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.UpsertMutationBuilder;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.V1TestWriter;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.client.Datastore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.UUID;
+
+/**
+ * End-to-end tests for Datastore DatastoreV1.Read.
+ */
+@RunWith(JUnit4.class)
+public class V1ReadIT {
+  private V1TestOptions options;
+  private String ancestor;
+  private final long numEntities = 1000;
+
+  @Before
+  public void setup() {
+    PipelineOptionsFactory.register(V1TestOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+    ancestor = UUID.randomUUID().toString();
+  }
+
+  /**
+   * An end-to-end test for {@link DatastoreV1.Read}.
+   *
+   * Write some test entities to datastore and then run a dataflow pipeline that
+   * reads and counts the total number of entities. Verify that the count matches the
+   * number of entities written.
+   */
+  @Test
+  public void testE2EV1Read() throws Exception {
+    // Create entities and write them to datastore
+    writeEntitiesToDatastore(options, ancestor, numEntities);
+
+    // Read from datastore
+    Query query = V1TestUtil.makeAncestorKindQuery(
+        options.getKind(), options.getNamespace(), ancestor);
+
+    DatastoreV1.Read read = DatastoreIO.v1().read()
+        .withProjectId(options.getProject())
+        .withQuery(query)
+        .withNamespace(options.getNamespace());
+
+    // Count the total number of entities
+    Pipeline p = Pipeline.create(options);
+    PCollection<Long> count = p
+        .apply(read)
+        .apply(Count.<Entity>globally());
+
+    PAssert.thatSingleton(count).isEqualTo(numEntities);
+    p.run();
+  }
+
+  // Creates entities and write them to datastore
+  private static void writeEntitiesToDatastore(V1TestOptions options, String ancestor,
+      long numEntities) throws Exception {
+    Datastore datastore = getDatastore(options, options.getProject());
+    // Write test entities to datastore
+    V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder());
+    Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
+
+    for (long i = 0; i < numEntities; i++) {
+      Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
+      writer.write(entity);
+    }
+    writer.close();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    deleteAllEntities(options, ancestor);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
new file mode 100644
index 0000000..360855f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+import javax.annotation.Nullable;
+
+/**
+ * DatastoreV1 Datastore related pipeline options.
+ */
+public interface V1TestOptions extends TestPipelineOptions {
+  @Description("Project ID to read from datastore")
+  @Default.String("apache-beam-testing")
+  String getProject();
+  void setProject(String value);
+
+  @Description("Datastore Entity kind")
+  @Default.String("beam-test")
+  String getKind();
+  void setKind(String value);
+
+  @Description("Datastore Namespace")
+  String getNamespace();
+  void setNamespace(@Nullable String value);
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
new file mode 100644
index 0000000..1e323ec
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Key.PathElement;
+import com.google.datastore.v1.Mutation;
+import com.google.datastore.v1.PropertyFilter;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.RunQueryRequest;
+import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.DatastoreException;
+import com.google.datastore.v1.client.DatastoreFactory;
+import com.google.datastore.v1.client.DatastoreOptions;
+import com.google.protobuf.Int32Value;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+class V1TestUtil {
+  private static final Logger LOG = LoggerFactory.getLogger(V1TestUtil.class);
+
+  /**
+   * A helper function to create the ancestor key for all created and queried entities.
+   */
+  static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) {
+    Key.Builder keyBuilder = makeKey(kind, ancestor);
+    if (namespace != null) {
+      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+    }
+    return keyBuilder.build();
+  }
+
+  /**
+   * Build a datastore ancestor query for the specified kind, namespace and ancestor.
+   */
+  static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) {
+    Query.Builder q = Query.newBuilder();
+    q.addKindBuilder().setName(kind);
+    q.setFilter(makeFilter(
+        "__key__",
+        PropertyFilter.Operator.HAS_ANCESTOR,
+        makeValue(makeAncestorKey(namespace, kind, ancestor))));
+    return q.build();
+  }
+
+  /**
+   * Build an entity for the given ancestorKey, kind, namespace and value.
+   */
+  static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
+    Entity.Builder entityBuilder = Entity.newBuilder();
+    Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
+    // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
+    // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
+    // we can simplify this code.
+    if (namespace != null) {
+      keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+    }
+
+    entityBuilder.setKey(keyBuilder.build());
+    entityBuilder.getMutableProperties().put("value", makeValue(value).build());
+    return entityBuilder.build();
+  }
+
+  /**
+   * A DoFn that creates entity for a long number.
+   */
+  static class CreateEntityFn extends DoFn<Long, Entity> {
+    private final String kind;
+    @Nullable
+    private final String namespace;
+    private Key ancestorKey;
+
+    CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
+      this.kind = kind;
+      this.namespace = namespace;
+      // Build the ancestor key for all created entities once, including the namespace.
+      ancestorKey = makeAncestorKey(namespace, kind, ancestor);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
+    }
+  }
+
+  /**
+   * Build a new datastore client.
+   */
+  static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+    DatastoreOptions.Builder builder =
+        new DatastoreOptions.Builder()
+            .projectId(projectId)
+            .initializer(
+                new RetryHttpRequestInitializer()
+            );
+
+    Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+    if (credential != null) {
+      builder.credential(credential);
+    }
+    return DatastoreFactory.get().create(builder.build());
+  }
+
+  /**
+   * Build a datastore query request.
+   */
+  private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
+    RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
+    if (namespace != null) {
+      requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+    }
+    return requestBuilder.build();
+  }
+
+  /**
+   * Delete all entities with the given ancestor.
+   */
+  static void deleteAllEntities(V1TestOptions options, String ancestor) throws Exception {
+    Datastore datastore = getDatastore(options, options.getProject());
+    Query query = V1TestUtil.makeAncestorKindQuery(
+        options.getKind(), options.getNamespace(), ancestor);
+
+    V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace());
+    V1TestWriter writer = new V1TestWriter(datastore, new DeleteMutationBuilder());
+
+    long numEntities = 0;
+    while (reader.advance()) {
+      Entity entity = reader.getCurrent();
+      numEntities++;
+      writer.write(entity);
+    }
+
+    writer.close();
+    LOG.info("Successfully deleted {} entities", numEntities);
+  }
+
+  /**
+   * Returns the total number of entities for the given datastore.
+   */
+  static long countEntities(V1TestOptions options, String ancestor) throws Exception {
+    // Read from datastore.
+    Datastore datastore = V1TestUtil.getDatastore(options, options.getProject());
+    Query query = V1TestUtil.makeAncestorKindQuery(
+        options.getKind(), options.getNamespace(), ancestor);
+
+    V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace());
+
+    long numEntitiesRead = 0;
+    while (reader.advance()) {
+      reader.getCurrent();
+      numEntitiesRead++;
+    }
+    return numEntitiesRead;
+  }
+
+  /**
+   * An interface to represent any datastore mutation operation.
+   * Mutation operations include insert, delete, upsert, update.
+   */
+  interface MutationBuilder {
+    Mutation.Builder apply(Entity entity);
+  }
+
+  /**
+   *A MutationBuilder that performs upsert operation.
+   */
+  static class UpsertMutationBuilder implements MutationBuilder {
+    public Mutation.Builder apply(Entity entity) {
+      return makeUpsert(entity);
+    }
+  }
+
+  /**
+   * A MutationBuilder that performs delete operation.
+   */
+  static class DeleteMutationBuilder implements MutationBuilder {
+    public Mutation.Builder apply(Entity entity) {
+      return makeDelete(entity.getKey());
+    }
+  }
+
+  /**
+   * A helper class to write entities to datastore.
+   */
+  static class V1TestWriter {
+    private static final Logger LOG = LoggerFactory.getLogger(V1TestWriter.class);
+    // Limits the number of entities updated per batch
+    private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+    // Number of times to retry on update failure
+    private static final int MAX_RETRIES = 5;
+    //Initial backoff time for exponential backoff for retry attempts.
+    private static final int INITIAL_BACKOFF_MILLIS = 5000;
+
+    // Returns true if a Datastore key is complete. A key is complete if its last element
+    // has either an id or a name.
+    static boolean isValidKey(Key key) {
+      List<PathElement> elementList = key.getPathList();
+      if (elementList.isEmpty()) {
+        return false;
+      }
+      PathElement lastElement = elementList.get(elementList.size() - 1);
+      return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+    }
+
+    private final Datastore datastore;
+    private final MutationBuilder mutationBuilder;
+    private final List<Entity> entities = new ArrayList<>();
+
+    V1TestWriter(Datastore datastore, MutationBuilder mutationBuilder) {
+      this.datastore = datastore;
+      this.mutationBuilder = mutationBuilder;
+    }
+
+    void write(Entity value) throws Exception {
+      // Verify that the entity to write has a complete key.
+      if (!isValidKey(value.getKey())) {
+        throw new IllegalArgumentException(
+            "Entities to be written to the Datastore must have complete keys");
+      }
+
+      entities.add(value);
+
+      if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
+        flushBatch();
+      }
+    }
+
+    void close() throws Exception {
+      // flush any remaining entities
+      if (entities.size() > 0) {
+        flushBatch();
+      }
+    }
+
+    // commit the list of entities to datastore
+    private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+      LOG.info("Writing batch of {} entities", entities.size());
+      Sleeper sleeper = Sleeper.DEFAULT;
+      BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+      while (true) {
+        // Batch mutate entities.
+        try {
+          CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+          for (Entity entity: entities) {
+            commitRequest.addMutations(mutationBuilder.apply(entity));
+          }
+          commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+          datastore.commit(commitRequest.build());
+          // Break if the commit threw no exception.
+          break;
+        } catch (DatastoreException exception) {
+          LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+              exception.getMessage());
+          if (!BackOffUtils.next(sleeper, backoff)) {
+            LOG.error("Aborting after {} retries.", MAX_RETRIES);
+            throw exception;
+          }
+        }
+      }
+      LOG.info("Successfully wrote {} entities", entities.size());
+      entities.clear();
+    }
+  }
+
+  /**
+   * A helper class to read entities from datastore.
+   */
+  static class V1TestReader {
+    private static final int QUERY_BATCH_LIMIT = 500;
+    private final Datastore datastore;
+    private final Query query;
+    @Nullable
+    private final String namespace;
+    private boolean moreResults;
+    private java.util.Iterator<EntityResult> entities;
+    // Current batch of query results
+    private QueryResultBatch currentBatch;
+    private Entity currentEntity;
+
+    V1TestReader(Datastore datastore, Query query, @Nullable String namespace) {
+      this.datastore = datastore;
+      this.query = query;
+      this.namespace = namespace;
+    }
+
+    Entity getCurrent() {
+      return currentEntity;
+    }
+
+    boolean advance() throws IOException {
+      if (entities == null || (!entities.hasNext() && moreResults)) {
+        try {
+          entities = getIteratorAndMoveCursor();
+        } catch (DatastoreException e) {
+          throw new IOException(e);
+        }
+      }
+
+      if (entities == null || !entities.hasNext()) {
+        currentEntity = null;
+        return false;
+      }
+
+      currentEntity = entities.next().getEntity();
+      return true;
+    }
+
+    // Read the next batch of query results.
+    private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
+      Query.Builder query = this.query.toBuilder().clone();
+      query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
+      if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+        query.setStartCursor(currentBatch.getEndCursor());
+      }
+
+      RunQueryRequest request = makeRequest(query.build(), namespace);
+      RunQueryResponse response = datastore.runQuery(request);
+
+      currentBatch = response.getBatch();
+
+      int numFetch = currentBatch.getEntityResultsCount();
+      // All indications from the API are that there are/may be more results.
+      moreResults = ((numFetch == QUERY_BATCH_LIMIT)
+          || (currentBatch.getMoreResults() == NOT_FINISHED));
+
+      // May receive a batch of 0 results if the number of records is a multiple
+      // of the request limit.
+      if (numFetch == 0) {
+        return null;
+      }
+
+      return currentBatch.getEntityResultsList().iterator();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
new file mode 100644
index 0000000..b97c05c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.countEntities;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.CreateEntityFn;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.UUID;
+
+/**
+ * End-to-end tests for Datastore DatastoreV1.Write.
+ */
+@RunWith(JUnit4.class)
+public class V1WriteIT {
+  private V1TestOptions options;
+  private String ancestor;
+  private final long numEntities = 1000;
+
+  @Before
+  public void setup() {
+    PipelineOptionsFactory.register(V1TestOptions.class);
+    options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+    ancestor = UUID.randomUUID().toString();
+  }
+
+  /**
+   * An end-to-end test for {@link DatastoreV1.Write}.
+   *
+   * Write some test entities to datastore through a dataflow pipeline.
+   * Read and count all the entities. Verify that the count matches the
+   * number of entities written.
+   */
+  @Test
+  public void testE2EV1Write() throws Exception {
+    Pipeline p = Pipeline.create(options);
+
+    // Write to datastore
+    p.apply(CountingInput.upTo(numEntities))
+        .apply(ParDo.of(new CreateEntityFn(
+            options.getKind(), options.getNamespace(), ancestor)))
+        .apply(DatastoreIO.v1().write().withProjectId(options.getProject()));
+
+    p.run();
+
+    // Count number of entities written to datastore.
+    long numEntitiesWritten = countEntities(options, ancestor);
+
+    assertEquals(numEntitiesWritten, numEntities);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    deleteAllEntities(options, ancestor);
+  }
+}