You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/08/20 01:10:49 UTC
[1/4] incubator-beam git commit: Closes #858
Repository: incubator-beam
Updated Branches:
refs/heads/master c57643f52 -> 921d0b2e7
Closes #858
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/921d0b2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/921d0b2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/921d0b2e
Branch: refs/heads/master
Commit: 921d0b2e752e7453332af651170142b1cd9a0cfa
Parents: c57643f 54e4cb1
Author: Dan Halperin <dh...@google.com>
Authored: Fri Aug 19 18:10:39 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 18:10:39 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 4 +-
.../beam/examples/complete/AutoComplete.java | 12 +-
.../examples/cookbook/DatastoreWordCount.java | 24 +-
pom.xml | 8 +-
runners/google-cloud-dataflow-java/pom.xml | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 4 +-
.../beam/sdk/io/gcp/datastore/DatastoreIO.java | 8 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 1034 ++++++++++++++++++
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 1033 -----------------
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 792 ++++++++++++++
.../sdk/io/gcp/datastore/V1Beta3ReadIT.java | 114 --
.../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 792 --------------
.../io/gcp/datastore/V1Beta3TestOptions.java | 44 -
.../sdk/io/gcp/datastore/V1Beta3TestUtil.java | 382 -------
.../sdk/io/gcp/datastore/V1Beta3WriteIT.java | 85 --
.../beam/sdk/io/gcp/datastore/V1ReadIT.java | 114 ++
.../sdk/io/gcp/datastore/V1TestOptions.java | 44 +
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 382 +++++++
.../beam/sdk/io/gcp/datastore/V1WriteIT.java | 85 ++
19 files changed, 2482 insertions(+), 2481 deletions(-)
----------------------------------------------------------------------
[4/4] incubator-beam git commit: DatastoreIO v1beta3 to v1
Posted by dh...@apache.org.
DatastoreIO v1beta3 to v1
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/54e4cb12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/54e4cb12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/54e4cb12
Branch: refs/heads/master
Commit: 54e4cb123187992b64b1580869ae5857f0ef613b
Parents: c57643f
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Fri Aug 19 16:18:37 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Fri Aug 19 18:10:39 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 4 +-
.../beam/examples/complete/AutoComplete.java | 12 +-
.../examples/cookbook/DatastoreWordCount.java | 24 +-
pom.xml | 8 +-
runners/google-cloud-dataflow-java/pom.xml | 2 +-
sdks/java/io/google-cloud-platform/pom.xml | 4 +-
.../beam/sdk/io/gcp/datastore/DatastoreIO.java | 8 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 1034 ++++++++++++++++++
.../beam/sdk/io/gcp/datastore/V1Beta3.java | 1033 -----------------
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 792 ++++++++++++++
.../sdk/io/gcp/datastore/V1Beta3ReadIT.java | 114 --
.../beam/sdk/io/gcp/datastore/V1Beta3Test.java | 792 --------------
.../io/gcp/datastore/V1Beta3TestOptions.java | 44 -
.../sdk/io/gcp/datastore/V1Beta3TestUtil.java | 382 -------
.../sdk/io/gcp/datastore/V1Beta3WriteIT.java | 85 --
.../beam/sdk/io/gcp/datastore/V1ReadIT.java | 114 ++
.../sdk/io/gcp/datastore/V1TestOptions.java | 44 +
.../beam/sdk/io/gcp/datastore/V1TestUtil.java | 382 +++++++
.../beam/sdk/io/gcp/datastore/V1WriteIT.java | 85 ++
19 files changed, 2482 insertions(+), 2481 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 21d7a3a..096bc4e 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -323,12 +323,12 @@
<dependency>
<groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-proto-client</artifactId>
+ <artifactId>datastore-v1-proto-client</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-protos</artifactId>
+ <artifactId>datastore-v1-protos</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index baae100..120c64f 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -18,8 +18,8 @@
package org.apache.beam.examples.complete;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
@@ -59,9 +59,9 @@ import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.MoreObjects;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Value;
import org.joda.time.Duration;
@@ -488,7 +488,7 @@ public class AutoComplete {
toWrite
.apply("FormatForDatastore", ParDo.of(new FormatForDatastore(options.getKind(),
options.getDatastoreAncestorKey())))
- .apply(DatastoreIO.v1beta3().write().withProjectId(MoreObjects.firstNonNull(
+ .apply(DatastoreIO.v1().write().withProjectId(MoreObjects.firstNonNull(
options.getOutputProject(), options.getProject())));
}
if (options.getOutputToBigQuery()) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 21220b8..215e2ff 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -17,16 +17,16 @@
*/
package org.apache.beam.examples.cookbook;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.getString;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
+import static com.google.datastore.v1.client.DatastoreHelper.getString;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
import org.apache.beam.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
@@ -36,11 +36,11 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.PropertyFilter;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.PropertyFilter;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.Value;
import java.util.Map;
import java.util.UUID;
@@ -194,7 +194,7 @@ public class DatastoreWordCount {
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.Read.from(options.getInput()))
.apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
- .apply(DatastoreIO.v1beta3().write().withProjectId(options.getProject()));
+ .apply(DatastoreIO.v1().write().withProjectId(options.getProject()));
p.run();
}
@@ -225,7 +225,7 @@ public class DatastoreWordCount {
Query query = makeAncestorKindQuery(options);
// For Datastore sources, the read namespace can be set on the entire query.
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
+ DatastoreV1.Read read = DatastoreIO.v1().read()
.withProjectId(options.getProject())
.withQuery(query)
.withNamespace(options.getNamespace());
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 58adbe7..f9e0479 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,8 +107,8 @@
<clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
<dataflow.version>v1b3-rev36-1.22.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
- <datastore.client.version>1.0.0-beta.2</datastore.client.version>
- <datastore.proto.version>1.0.0-beta</datastore.proto.version>
+ <datastore.client.version>1.1.0</datastore.client.version>
+ <datastore.proto.version>1.0.1</datastore.proto.version>
<google-auto-service.version>1.0-rc2</google-auto-service.version>
<google-auto-value.version>1.1</google-auto-value.version>
<google-clients.version>1.22.0</google-clients.version>
@@ -450,13 +450,13 @@
<dependency>
<groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-proto-client</artifactId>
+ <artifactId>datastore-v1-proto-client</artifactId>
<version>${datastore.client.version}</version>
</dependency>
<dependency>
<groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-protos</artifactId>
+ <artifactId>datastore-v1-protos</artifactId>
<version>${datastore.proto.version}</version>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 00b5a9b..0044823 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -409,7 +409,7 @@
<dependency>
<groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-protos</artifactId>
+ <artifactId>datastore-v1-protos</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/pom.xml b/sdks/java/io/google-cloud-platform/pom.xml
index 1596a66..8075335 100644
--- a/sdks/java/io/google-cloud-platform/pom.xml
+++ b/sdks/java/io/google-cloud-platform/pom.xml
@@ -115,12 +115,12 @@
<dependency>
<groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-proto-client</artifactId>
+ <artifactId>datastore-v1-proto-client</artifactId>
</dependency>
<dependency>
<groupId>com.google.cloud.datastore</groupId>
- <artifactId>datastore-v1beta3-protos</artifactId>
+ <artifactId>datastore-v1-protos</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
index bde0aba..5abf015 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreIO.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.annotations.Experimental;
* <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> over different
* versions of the Datastore Client libraries.
*
- * <p>To use the v1beta3 version see {@link V1Beta3}.
+ * <p>To use the v1 version see {@link DatastoreV1}.
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class DatastoreIO {
@@ -32,10 +32,10 @@ public class DatastoreIO {
private DatastoreIO() {}
/**
- * Returns a {@link V1Beta3} that provides an API for accessing Datastore through v1beta3 version
+ * Returns a {@link DatastoreV1} that provides an API for accessing Datastore through v1 version
* of Datastore Client library.
*/
- public static V1Beta3 v1beta3() {
- return new V1Beta3();
+ public static DatastoreV1 v1() {
+ return new DatastoreV1();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
new file mode 100644
index 0000000..852595a
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -0,0 +1,1034 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Verify.verify;
+import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
+import org.apache.beam.sdk.transforms.Values;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableList;
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Key.PathElement;
+import com.google.datastore.v1.Mutation;
+import com.google.datastore.v1.PartitionId;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.RunQueryRequest;
+import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.DatastoreException;
+import com.google.datastore.v1.client.DatastoreFactory;
+import com.google.datastore.v1.client.DatastoreHelper;
+import com.google.datastore.v1.client.DatastoreOptions;
+import com.google.datastore.v1.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NoSuchElementException;
+import javax.annotation.Nullable;
+
+/**
+ * <p>{@link DatastoreV1} provides an API to Read, Write and Delete {@link PCollection PCollections}
+ * of <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1
+ * {@link Entity} objects.
+ *
+ * <p>This API currently requires an authentication workaround. To use {@link DatastoreV1}, users
+ * must use the {@code gcloud} command line tool to get credentials for Datastore:
+ * <pre>
+ * $ gcloud auth login
+ * </pre>
+ *
+ * <p>To read a {@link PCollection} from a query to Datastore, use {@link DatastoreV1#read} and
+ * its methods {@link DatastoreV1.Read#withProjectId} and {@link DatastoreV1.Read#withQuery} to
+ * specify the project to query and the query to read from. You can optionally provide a namespace
+ * to query within using {@link DatastoreV1.Read#withNamespace}. You could also optionally specify
+ * how many splits you want for the query using {@link DatastoreV1.Read#withNumQuerySplits}.
+ *
+ * <p>For example:
+ *
+ * <pre> {@code
+ * // Read a query from Datastore
+ * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
+ * Query query = ...;
+ * String projectId = "...";
+ *
+ * Pipeline p = Pipeline.create(options);
+ * PCollection<Entity> entities = p.apply(
+ * DatastoreIO.v1().read()
+ * .withProjectId(projectId)
+ * .withQuery(query));
+ * } </pre>
+ *
+ * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
+ * many workers. However, when the {@link Query} is configured with a limit using
+ * {@link com.google.datastore.v1.Query.Builder#setLimit(Int32Value)}, then
+ * all returned results will be read by a single Dataflow worker in order to ensure correct data.
+ *
+ * <p>To write a {@link PCollection} to a Datastore, use {@link DatastoreV1#write},
+ * specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1().write().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>To delete a {@link PCollection} of {@link Entity Entities} from Datastore, use
+ * {@link DatastoreV1#deleteEntity()}, specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1().deleteEntity().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Datastore,
+ * use {@link DatastoreV1#deleteKey}, specifying the Cloud Datastore project to write to:
+ *
+ * <pre> {@code
+ * PCollection<Entity> entities = ...;
+ * entities.apply(DatastoreIO.v1().deleteKey().withProjectId(projectId));
+ * p.run();
+ * } </pre>
+ *
+ * <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete
+ * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
+ * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
+ * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
+ *
+ * <pre>{@code
+ * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
+ * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+ * }</pre>
+ *
+ * <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please
+ * read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties,
+ * and Keys</a> for more information about {@code Entity} keys.
+ *
+ * <p><h3>Permissions</h3>
+ * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
+ * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
+ * more details.
+ *
+ * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
+ * </a>for security and permission related information specific to Datastore.
+ *
+ * @see org.apache.beam.sdk.runners.PipelineRunner
+ */
+@Experimental(Experimental.Kind.SOURCE_SINK)
+public class DatastoreV1 {
+
+ // A package-private constructor to prevent direct instantiation from outside of this package
+ DatastoreV1() {}
+
+ /**
+ * Datastore has a limit of 500 mutations per batch operation, so we flush
+ * changes to Datastore every 500 entities.
+ */
+ @VisibleForTesting
+ static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+
+ /**
+ * Returns an empty {@link DatastoreV1.Read} builder. Configure the source {@code projectId},
+ * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
+ * {@link DatastoreV1.Read#withProjectId}, {@link DatastoreV1.Read#withQuery},
+ * {@link DatastoreV1.Read#withNamespace}, {@link DatastoreV1.Read#withNumQuerySplits}.
+ */
+ public DatastoreV1.Read read() {
+ return new DatastoreV1.Read(null, null, null, 0);
+ }
+
+ /**
+ * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity}
+ * objects.
+ *
+ * @see DatastoreIO
+ */
+ public static class Read extends PTransform<PBegin, PCollection<Entity>> {
+ private static final Logger LOG = LoggerFactory.getLogger(Read.class);
+
+ /** An upper bound on the number of splits for a query. */
+ public static final int NUM_QUERY_SPLITS_MAX = 50000;
+
+ /** A lower bound on the number of splits for a query. */
+ static final int NUM_QUERY_SPLITS_MIN = 12;
+
+ /** Default bundle size of 64MB. */
+ static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024;
+
+ /**
+ * Maximum number of results to request per query.
+ *
+ * <p>Must be set, or it may result in an I/O error when querying Cloud Datastore.
+ */
+ static final int QUERY_BATCH_LIMIT = 500;
+
+ @Nullable
+ private final String projectId;
+
+ @Nullable
+ private final Query query;
+
+ @Nullable
+ private final String namespace;
+
+ private final int numQuerySplits;
+
+ /**
+ * Computes the number of splits to be performed on the given query by querying the estimated
+ * size from Datastore.
+ */
+ static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
+ int numSplits;
+ try {
+ long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
+ numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX,
+ Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES));
+ } catch (Exception e) {
+ LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", query, e);
+ // Fallback in case estimated size is unavailable.
+ numSplits = NUM_QUERY_SPLITS_MIN;
+ }
+ return Math.max(numSplits, NUM_QUERY_SPLITS_MIN);
+ }
+
+ /**
+ * Get the estimated size of the data returned by the given query.
+ *
+ * <p>Datastore provides no way to get a good estimate of how large the result of a query
+ * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
+ * is specified in the query.
+ *
+ * <p>See https://cloud.google.com/datastore/docs/concepts/stats.
+ */
+ static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
+ throws DatastoreException {
+ String ourKind = query.getKind(0).getName();
+ Query.Builder queryBuilder = Query.newBuilder();
+ if (namespace == null) {
+ queryBuilder.addKindBuilder().setName("__Stat_Kind__");
+ } else {
+ queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__");
+ }
+ queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build()));
+
+ // Get the latest statistics
+ queryBuilder.addOrder(makeOrder("timestamp", DESCENDING));
+ queryBuilder.setLimit(Int32Value.newBuilder().setValue(1));
+
+ RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+
+ long now = System.currentTimeMillis();
+ RunQueryResponse response = datastore.runQuery(request);
+ LOG.debug("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
+
+ QueryResultBatch batch = response.getBatch();
+ if (batch.getEntityResultsCount() == 0) {
+ throw new NoSuchElementException(
+ "Datastore statistics for kind " + ourKind + " unavailable");
+ }
+ Entity entity = batch.getEntityResults(0).getEntity();
+ return entity.getProperties().get("entity_bytes").getIntegerValue();
+ }
+
+ /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */
+ static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
+ RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
+ if (namespace != null) {
+ requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+ }
+ return requestBuilder.build();
+ }
+
+ /**
+ * A helper function to get the split queries, taking into account the optional
+ * {@code namespace}.
+ */
+ private static List<Query> splitQuery(Query query, @Nullable String namespace,
+ Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException {
+ // If namespace is set, include it in the split request so splits are calculated accordingly.
+ PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
+ if (namespace != null) {
+ partitionBuilder.setNamespaceId(namespace);
+ }
+
+ return querySplitter.getSplits(query, partitionBuilder.build(), numSplits, datastore);
+ }
+
+ /**
+ * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
+ * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
+ * an error will be thrown.
+ */
+ private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace,
+ int numQuerySplits) {
+ this.projectId = projectId;
+ this.query = query;
+ this.namespace = namespace;
+ this.numQuerySplits = numQuerySplits;
+ }
+
+ /**
+ * Returns a new {@link DatastoreV1.Read} that reads from the Datastore for the specified
+ * project.
+ */
+ public DatastoreV1.Read withProjectId(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new DatastoreV1.Read(projectId, query, namespace, numQuerySplits);
+ }
+
+ /**
+ * Returns a new {@link DatastoreV1.Read} that reads the results of the specified query.
+ *
+ * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
+ * across many workers. However, when the {@link Query} is configured with a limit using
+ * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order
+ * to ensure correct results.
+ */
+ public DatastoreV1.Read withQuery(Query query) {
+ checkNotNull(query, "query");
+ checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
+ "Invalid query limit %s: must be positive", query.getLimit().getValue());
+ return new DatastoreV1.Read(projectId, query, namespace, numQuerySplits);
+ }
+
+ /**
+ * Returns a new {@link DatastoreV1.Read} that reads from the given namespace.
+ */
+ public DatastoreV1.Read withNamespace(String namespace) {
+ return new DatastoreV1.Read(projectId, query, namespace, numQuerySplits);
+ }
+
+ /**
+ * Returns a new {@link DatastoreV1.Read} that reads by splitting the given {@code query} into
+ * {@code numQuerySplits}.
+ *
+ * <p>The semantics for the query splitting is defined below:
+ * <ul>
+ * <li>Any value less than or equal to 0 will be ignored, and the number of splits will be
+ * chosen dynamically at runtime based on the query data size.
+ * <li>Any value greater than {@link Read#NUM_QUERY_SPLITS_MAX} will be capped at
+ * {@code NUM_QUERY_SPLITS_MAX}.
+ * <li>If the {@code query} has a user limit set, then {@code numQuerySplits} will be
+ * ignored and no split will be performed.
+ * <li>Under certain cases Cloud Datastore is unable to split query to the requested number of
+ * splits. In such cases we just use whatever the Datastore returns.
+ * </ul>
+ */
+ public DatastoreV1.Read withNumQuerySplits(int numQuerySplits) {
+ return new DatastoreV1.Read(projectId, query, namespace,
+ Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX));
+ }
+
+ @Nullable
+ public Query getQuery() {
+ return query;
+ }
+
+ @Nullable
+ public String getProjectId() {
+ return projectId;
+ }
+
+ @Nullable
+ public String getNamespace() {
+ return namespace;
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public PCollection<Entity> apply(PBegin input) {
+ V1Options v1Options = V1Options.from(getProjectId(), getQuery(),
+ getNamespace());
+
+ /*
+ * This composite transform involves the following steps:
+ * 1. Create a singleton of the user provided {@code query} and apply a {@link ParDo} that
+ * splits the query into {@code numQuerySplits} and assign each split query a unique
+ * {@code Integer} as the key. The resulting output is of the type
+ * {@code PCollection<KV<Integer, Query>>}.
+ *
+ * If the value of {@code numQuerySplits} is less than or equal to 0, then the number of
+ * splits will be computed dynamically based on the size of the data for the {@code query}.
+ *
+ * 2. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The
+ * queries are extracted from they {@code KV<Integer, Iterable<Query>>} and flattened to
+ * output a {@code PCollection<Query>}.
+ *
+ * 3. In the third step, a {@code ParDo} reads entities for each query and outputs
+ * a {@code PCollection<Entity>}.
+ */
+ PCollection<KV<Integer, Query>> queries = input
+ .apply(Create.of(query))
+ .apply(ParDo.of(new SplitQueryFn(v1Options, numQuerySplits)));
+
+ PCollection<Query> shardedQueries = queries
+ .apply(GroupByKey.<Integer, Query>create())
+ .apply(Values.<Iterable<Query>>create())
+ .apply(Flatten.<Query>iterables());
+
+ PCollection<Entity> entities = shardedQueries
+ .apply(ParDo.of(new ReadFn(v1Options)));
+
+ return entities;
+ }
+
+ @Override
+ public void validate(PBegin input) {
+ checkNotNull(projectId, "projectId");
+ checkNotNull(query, "query");
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("ProjectId"))
+ .addIfNotNull(DisplayData.item("namespace", namespace)
+ .withLabel("Namespace"))
+ .addIfNotNull(DisplayData.item("query", query.toString())
+ .withLabel("Query"));
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("projectId", projectId)
+ .add("query", query)
+ .add("namespace", namespace)
+ .toString();
+ }
+
+ /**
+ * A class for v1 Datastore related options.
+ */
+ @VisibleForTesting
+ static class V1Options implements Serializable {
+ private final Query query;
+ private final String projectId;
+ @Nullable
+ private final String namespace;
+
+ private V1Options(String projectId, Query query, @Nullable String namespace) {
+ this.projectId = checkNotNull(projectId, "projectId");
+ this.query = checkNotNull(query, "query");
+ this.namespace = namespace;
+ }
+
+ public static V1Options from(String projectId, Query query, @Nullable String namespace) {
+ return new V1Options(projectId, query, namespace);
+ }
+
+ public Query getQuery() {
+ return query;
+ }
+
+ public String getProjectId() {
+ return projectId;
+ }
+
+ @Nullable
+ public String getNamespace() {
+ return namespace;
+ }
+ }
+
+ /**
+ * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique
+ * keys and outputs them as {@link KV}.
+ */
+ @VisibleForTesting
+ static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
+ private final V1Options options;
+ // number of splits to make for a given query
+ private final int numSplits;
+
+ private final V1DatastoreFactory datastoreFactory;
+ // Datastore client
+ private transient Datastore datastore;
+ // Query splitter
+ private transient QuerySplitter querySplitter;
+
+ public SplitQueryFn(V1Options options, int numSplits) {
+ this(options, numSplits, new V1DatastoreFactory());
+ }
+
+ @VisibleForTesting
+ SplitQueryFn(V1Options options, int numSplits,
+ V1DatastoreFactory datastoreFactory) {
+ this.options = options;
+ this.numSplits = numSplits;
+ this.datastoreFactory = datastoreFactory;
+ }
+
+ @StartBundle
+ public void startBundle(Context c) throws Exception {
+ datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
+ querySplitter = datastoreFactory.getQuerySplitter();
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ int key = 1;
+ Query query = c.element();
+
+ // If query has a user set limit, then do not split.
+ if (query.hasLimit()) {
+ c.output(KV.of(key, query));
+ return;
+ }
+
+ int estimatedNumSplits;
+ // Compute the estimated numSplits if numSplits is not specified by the user.
+ if (numSplits <= 0) {
+ estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace());
+ } else {
+ estimatedNumSplits = numSplits;
+ }
+
+ List<Query> querySplits;
+ try {
+ querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter,
+ estimatedNumSplits);
+ } catch (Exception e) {
+ LOG.warn("Unable to parallelize the given query: {}", query, e);
+ querySplits = ImmutableList.of(query);
+ }
+
+ // assign unique keys to query splits.
+ for (Query subquery : querySplits) {
+ c.output(KV.of(key++, subquery));
+ }
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", options.getProjectId())
+ .withLabel("ProjectId"))
+ .addIfNotNull(DisplayData.item("namespace", options.getNamespace())
+ .withLabel("Namespace"))
+ .addIfNotNull(DisplayData.item("query", options.getQuery().toString())
+ .withLabel("Query"));
+ }
+ }
+
+ /**
+ * A {@link DoFn} that reads entities from Datastore for each query.
+ */
+ @VisibleForTesting
+ static class ReadFn extends DoFn<Query, Entity> {
+ private final V1Options options;
+ private final V1DatastoreFactory datastoreFactory;
+ // Datastore client
+ private transient Datastore datastore;
+
+ public ReadFn(V1Options options) {
+ this(options, new V1DatastoreFactory());
+ }
+
+ @VisibleForTesting
+ ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) {
+ this.options = options;
+ this.datastoreFactory = datastoreFactory;
+ }
+
+ @StartBundle
+ public void startBundle(Context c) throws Exception {
+ datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
+ }
+
+ /** Read and output entities for the given query. */
+ @ProcessElement
+ public void processElement(ProcessContext context) throws Exception {
+ Query query = context.element();
+ String namespace = options.getNamespace();
+ int userLimit = query.hasLimit()
+ ? query.getLimit().getValue() : Integer.MAX_VALUE;
+
+ boolean moreResults = true;
+ QueryResultBatch currentBatch = null;
+
+ while (moreResults) {
+ Query.Builder queryBuilder = query.toBuilder().clone();
+ queryBuilder.setLimit(Int32Value.newBuilder().setValue(
+ Math.min(userLimit, QUERY_BATCH_LIMIT)));
+
+ if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+ queryBuilder.setStartCursor(currentBatch.getEndCursor());
+ }
+
+ RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+ RunQueryResponse response = datastore.runQuery(request);
+
+ currentBatch = response.getBatch();
+
+ // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
+ // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
+ // use result count to determine if more results might exist.
+ int numFetch = currentBatch.getEntityResultsCount();
+ if (query.hasLimit()) {
+ verify(userLimit >= numFetch,
+ "Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit",
+ userLimit, numFetch, query.getLimit());
+ userLimit -= numFetch;
+ }
+
+ // output all the entities from the current batch.
+ for (EntityResult entityResult : currentBatch.getEntityResultsList()) {
+ context.output(entityResult.getEntity());
+ }
+
+ // Check if we have more entities to be read.
+ moreResults =
+ // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied
+ (userLimit > 0)
+ // All indications from the API are that there are/may be more results.
+ && ((numFetch == QUERY_BATCH_LIMIT)
+ || (currentBatch.getMoreResults() == NOT_FINISHED));
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns an empty {@link DatastoreV1.Write} builder. Configure the destination
+ * {@code projectId} using {@link DatastoreV1.Write#withProjectId}.
+ */
+ public Write write() {
+ return new Write(null);
+ }
+
+ /**
+ * Returns an empty {@link DeleteEntity} builder. Configure the destination
+ * {@code projectId} using {@link DeleteEntity#withProjectId}.
+ */
+ public DeleteEntity deleteEntity() {
+ return new DeleteEntity(null);
+ }
+
+ /**
+ * Returns an empty {@link DeleteKey} builder. Configure the destination
+ * {@code projectId} using {@link DeleteKey#withProjectId}.
+ */
+ public DeleteKey deleteKey() {
+ return new DeleteKey(null);
+ }
+
+ /**
+ * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
+ *
+ * @see DatastoreIO
+ */
+ public static class Write extends Mutate<Entity> {
+ /**
+ * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+ * it is {@code null} at instantiation time, an error will be thrown.
+ */
+ Write(@Nullable String projectId) {
+ super(projectId, new UpsertFn());
+ }
+
+ /**
+ * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project.
+ */
+ public Write withProjectId(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new Write(projectId);
+ }
+ }
+
+ /**
+ * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore.
+ *
+ * @see DatastoreIO
+ */
+ public static class DeleteEntity extends Mutate<Entity> {
+ /**
+ * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+ * it is {@code null} at instantiation time, an error will be thrown.
+ */
+ DeleteEntity(@Nullable String projectId) {
+ super(projectId, new DeleteEntityFn());
+ }
+
+ /**
+ * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the
+ * specified project.
+ */
+ public DeleteEntity withProjectId(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new DeleteEntity(projectId);
+ }
+ }
+
+ /**
+ * A {@link PTransform} that deletes {@link Entity Entities} associated with the given
+ * {@link Key Keys} from Cloud Datastore.
+ *
+ * @see DatastoreIO
+ */
+ public static class DeleteKey extends Mutate<Key> {
+ /**
+ * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+ * it is {@code null} at instantiation time, an error will be thrown.
+ */
+ DeleteKey(@Nullable String projectId) {
+ super(projectId, new DeleteKeyFn());
+ }
+
+ /**
+ * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the
+ * specified project.
+ */
+ public DeleteKey withProjectId(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new DeleteKey(projectId);
+ }
+ }
+
+ /**
+ * A {@link PTransform} that writes mutations to Cloud Datastore.
+ *
+ * <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}.
+ * {@code T} is usually either an {@link Entity} or a {@link Key}
+ * <b>Note:</b> Only idempotent Cloud Datastore mutation operations (upsert and delete) should
+ * be used by the {@code DoFn} provided, as the commits are retried when failures occur.
+ */
+ private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
+ @Nullable
+ private final String projectId;
+ /** A function that transforms each {@code T} into a mutation. */
+ private final SimpleFunction<T, Mutation> mutationFn;
+
+ /**
+ * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
+ * it is {@code null} at instantiation time, an error will be thrown.
+ */
+ Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) {
+ this.projectId = projectId;
+ this.mutationFn = checkNotNull(mutationFn);
+ }
+
+ @Override
+ public PDone apply(PCollection<T> input) {
+ input.apply("Convert to Mutation", MapElements.via(mutationFn))
+ .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
+
+ return PDone.in(input.getPipeline());
+ }
+
+ @Override
+ public void validate(PCollection<T> input) {
+ checkNotNull(projectId, "projectId");
+ checkNotNull(mutationFn, "mutationFn");
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("projectId", projectId)
+ .add("mutationFn", mutationFn.getClass().getName())
+ .toString();
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("Output Project"))
+ .include(mutationFn);
+ }
+
+ public String getProjectId() {
+ return projectId;
+ }
+ }
+
+ /**
+ * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in
+ * batches, where the maximum batch size is {@link DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}.
+ *
+ * <p>See <a
+ * href="https://cloud.google.com/datastore/docs/concepts/entities">
+ * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations.
+ *
+ * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity
+ * group, the commit will be retried (up to {@link DatastoreV1#DATASTORE_BATCH_UPDATE_LIMIT}
+ * times). This means that the mutation operation should be idempotent. Thus, the writer should
+ * only be used for {code upsert} and {@code delete} mutation operations, as these are the only
+ * two Cloud Datastore mutations that are idempotent.
+ */
+ @VisibleForTesting
+ static class DatastoreWriterFn extends DoFn<Mutation, Void> {
+ private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
+ private final String projectId;
+ private transient Datastore datastore;
+ private final V1DatastoreFactory datastoreFactory;
+ // Current batch of mutations to be written.
+ private final List<Mutation> mutations = new ArrayList<>();
+ /**
+ * Since a bundle is written in batches, we should retry the commit of a batch in order to
+ * prevent transient errors from causing the bundle to fail.
+ */
+ private static final int MAX_RETRIES = 5;
+
+ /**
+ * Initial backoff time for exponential backoff for retry attempts.
+ */
+ private static final int INITIAL_BACKOFF_MILLIS = 5000;
+
+ DatastoreWriterFn(String projectId) {
+ this(projectId, new V1DatastoreFactory());
+ }
+
+ @VisibleForTesting
+ DatastoreWriterFn(String projectId, V1DatastoreFactory datastoreFactory) {
+ this.projectId = checkNotNull(projectId, "projectId");
+ this.datastoreFactory = datastoreFactory;
+ }
+
+ @StartBundle
+ public void startBundle(Context c) {
+ datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ mutations.add(c.element());
+ if (mutations.size() >= DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT) {
+ flushBatch();
+ }
+ }
+
+ @FinishBundle
+ public void finishBundle(Context c) throws Exception {
+ if (mutations.size() > 0) {
+ flushBatch();
+ }
+ }
+
+ /**
+ * Writes a batch of mutations to Cloud Datastore.
+ *
+ * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
+ * times). All mutations in the batch will be committed again, even if the commit was partially
+ * successful. If the retry limit is exceeded, the last exception from the Datastore will be
+ * thrown.
+ *
+ * @throws DatastoreException if the commit fails or IOException or InterruptedException if
+ * backing off between retries fails.
+ */
+ private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+ LOG.debug("Writing batch of {} mutations", mutations.size());
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+ while (true) {
+ // Batch upsert entities.
+ try {
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ commitRequest.addAllMutations(mutations);
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ datastore.commit(commitRequest.build());
+ // Break if the commit threw no exception.
+ break;
+ } catch (DatastoreException exception) {
+ // Only log the code and message for potentially-transient errors. The entire exception
+ // will be propagated upon the last retry.
+ LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+ exception.getMessage());
+ if (!BackOffUtils.next(sleeper, backoff)) {
+ LOG.error("Aborting after {} retries.", MAX_RETRIES);
+ throw exception;
+ }
+ }
+ }
+ LOG.debug("Successfully wrote {} mutations", mutations.size());
+ mutations.clear();
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ super.populateDisplayData(builder);
+ builder
+ .addIfNotNull(DisplayData.item("projectId", projectId)
+ .withLabel("Output Project"));
+ }
+ }
+
+ /**
+ * Returns true if a Datastore key is complete. A key is complete if its last element
+ * has either an id or a name.
+ */
+ static boolean isValidKey(Key key) {
+ List<PathElement> elementList = key.getPathList();
+ if (elementList.isEmpty()) {
+ return false;
+ }
+ PathElement lastElement = elementList.get(elementList.size() - 1);
+ return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+ }
+
+ /**
+ * A function that constructs an upsert {@link Mutation} from an {@link Entity}.
+ */
+ @VisibleForTesting
+ static class UpsertFn extends SimpleFunction<Entity, Mutation> {
+ @Override
+ public Mutation apply(Entity entity) {
+ // Verify that the entity to write has a complete key.
+ checkArgument(isValidKey(entity.getKey()),
+ "Entities to be written to the Datastore must have complete keys:\n%s", entity);
+
+ return makeUpsert(entity).build();
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add(DisplayData.item("upsertFn", this.getClass())
+ .withLabel("Create Upsert Mutation"));
+ }
+ }
+
+ /**
+ * A function that constructs a delete {@link Mutation} from an {@link Entity}.
+ */
+ @VisibleForTesting
+ static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> {
+ @Override
+ public Mutation apply(Entity entity) {
+ // Verify that the entity to delete has a complete key.
+ checkArgument(isValidKey(entity.getKey()),
+ "Entities to be deleted from the Datastore must have complete keys:\n%s", entity);
+
+ return makeDelete(entity.getKey()).build();
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add(DisplayData.item("deleteEntityFn", this.getClass())
+ .withLabel("Create Delete Mutation"));
+ }
+ }
+
+ /**
+ * A function that constructs a delete {@link Mutation} from a {@link Key}.
+ */
+ @VisibleForTesting
+ static class DeleteKeyFn extends SimpleFunction<Key, Mutation> {
+ @Override
+ public Mutation apply(Key key) {
+ // Verify that the entity to delete has a complete key.
+ checkArgument(isValidKey(key),
+ "Keys to be deleted from the Datastore must be complete:\n%s", key);
+
+ return makeDelete(key).build();
+ }
+
+ @Override
+ public void populateDisplayData(Builder builder) {
+ builder.add(DisplayData.item("deleteKeyFn", this.getClass())
+ .withLabel("Create Delete Mutation"));
+ }
+ }
+
+ /**
+ * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and
+ * {@link QuerySplitter}
+ *
+ * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence
+ * wrapping them under this class, which implements {@link Serializable}.
+ */
+ @VisibleForTesting
+ static class V1DatastoreFactory implements Serializable {
+
+ /** Builds a Datastore client for the given pipeline options and project. */
+ public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+ DatastoreOptions.Builder builder =
+ new DatastoreOptions.Builder()
+ .projectId(projectId)
+ .initializer(
+ new RetryHttpRequestInitializer()
+ );
+
+ Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+ if (credential != null) {
+ builder.credential(credential);
+ }
+
+ return DatastoreFactory.get().create(builder.build());
+ }
+
+ /** Builds a Datastore {@link QuerySplitter}. */
+ public QuerySplitter getQuerySplitter() {
+ return DatastoreHelper.getQuerySplitter();
+ }
+ }
+}
[3/4] incubator-beam git commit: DatastoreIO v1beta3 to v1
Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
deleted file mode 100644
index 8503b66..0000000
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3.java
+++ /dev/null
@@ -1,1033 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Verify.verify;
-import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
-import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
-import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.Flatten;
-import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.Values;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayData.Builder;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.ImmutableList;
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Key.PathElement;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreException;
-import com.google.datastore.v1beta3.client.DatastoreFactory;
-import com.google.datastore.v1beta3.client.DatastoreHelper;
-import com.google.datastore.v1beta3.client.DatastoreOptions;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.NoSuchElementException;
-import javax.annotation.Nullable;
-
-/**
- * <p>{@link V1Beta3} provides an API to Read, Write and Delete {@link PCollection PCollections} of
- * <a href="https://developers.google.com/datastore/">Google Cloud Datastore</a> version v1beta3
- * {@link Entity} objects.
- *
- * <p>This API currently requires an authentication workaround. To use {@link V1Beta3}, users
- * must use the {@code gcloud} command line tool to get credentials for Datastore:
- * <pre>
- * $ gcloud auth login
- * </pre>
- *
- * <p>To read a {@link PCollection} from a query to Datastore, use {@link V1Beta3#read} and
- * its methods {@link V1Beta3.Read#withProjectId} and {@link V1Beta3.Read#withQuery} to
- * specify the project to query and the query to read from. You can optionally provide a namespace
- * to query within using {@link V1Beta3.Read#withNamespace}. You could also optionally specify
- * how many splits you want for the query using {@link V1Beta3.Read#withNumQuerySplits}.
- *
- * <p>For example:
- *
- * <pre> {@code
- * // Read a query from Datastore
- * PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
- * Query query = ...;
- * String projectId = "...";
- *
- * Pipeline p = Pipeline.create(options);
- * PCollection<Entity> entities = p.apply(
- * DatastoreIO.v1beta3().read()
- * .withProjectId(projectId)
- * .withQuery(query));
- * } </pre>
- *
- * <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
- * many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then
- * all returned results will be read by a single Dataflow worker in order to ensure correct data.
- *
- * <p>To write a {@link PCollection} to a Datastore, use {@link V1Beta3#write},
- * specifying the Cloud Datastore project to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.v1beta3().write().withProjectId(projectId));
- * p.run();
- * } </pre>
- *
- * <p>To delete a {@link PCollection} of {@link Entity Entities} from Datastore, use
- * {@link V1Beta3#deleteEntity()}, specifying the Cloud Datastore project to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.v1beta3().deleteEntity().withProjectId(projectId));
- * p.run();
- * } </pre>
- *
- * <p>To delete entities associated with a {@link PCollection} of {@link Key Keys} from Datastore,
- * use {@link V1Beta3#deleteKey}, specifying the Cloud Datastore project to write to:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.v1beta3().deleteKey().withProjectId(projectId));
- * p.run();
- * } </pre>
- *
- * <p>{@link Entity Entities} in the {@code PCollection} to be written or deleted must have complete
- * {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
- * {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than
- * {@code projectId} default may be used by specifying it in the {@code Entity} {@code Keys}.
- *
- * <pre>{@code
- * Key.Builder keyBuilder = DatastoreHelper.makeKey(...);
- * keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
- * }</pre>
- *
- * <p>{@code Entities} will be committed as upsert (update or insert) or delete mutations. Please
- * read <a href="https://cloud.google.com/datastore/docs/concepts/entities">Entities, Properties,
- * and Keys</a> for more information about {@code Entity} keys.
- *
- * <p><h3>Permissions</h3>
- * Permission requirements depend on the {@code PipelineRunner} that is used to execute the
- * Dataflow job. Please refer to the documentation of corresponding {@code PipelineRunner}s for
- * more details.
- *
- * <p>Please see <a href="https://cloud.google.com/datastore/docs/activate">Cloud Datastore Sign Up
- * </a>for security and permission related information specific to Datastore.
- *
- * @see org.apache.beam.sdk.runners.PipelineRunner
- */
-@Experimental(Experimental.Kind.SOURCE_SINK)
-public class V1Beta3 {
-
- // A package-private constructor to prevent direct instantiation from outside of this package
- V1Beta3() {}
-
- /**
- * Datastore has a limit of 500 mutations per batch operation, so we flush
- * changes to Datastore every 500 entities.
- */
- @VisibleForTesting
- static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
-
- /**
- * Returns an empty {@link V1Beta3.Read} builder. Configure the source {@code projectId},
- * {@code query}, and optionally {@code namespace} and {@code numQuerySplits} using
- * {@link V1Beta3.Read#withProjectId}, {@link V1Beta3.Read#withQuery},
- * {@link V1Beta3.Read#withNamespace}, {@link V1Beta3.Read#withNumQuerySplits}.
- */
- public V1Beta3.Read read() {
- return new V1Beta3.Read(null, null, null, 0);
- }
-
- /**
- * A {@link PTransform} that reads the result rows of a Datastore query as {@code Entity}
- * objects.
- *
- * @see DatastoreIO
- */
- public static class Read extends PTransform<PBegin, PCollection<Entity>> {
- private static final Logger LOG = LoggerFactory.getLogger(Read.class);
-
- /** An upper bound on the number of splits for a query. */
- public static final int NUM_QUERY_SPLITS_MAX = 50000;
-
- /** A lower bound on the number of splits for a query. */
- static final int NUM_QUERY_SPLITS_MIN = 12;
-
- /** Default bundle size of 64MB. */
- static final long DEFAULT_BUNDLE_SIZE_BYTES = 64 * 1024 * 1024;
-
- /**
- * Maximum number of results to request per query.
- *
- * <p>Must be set, or it may result in an I/O error when querying Cloud Datastore.
- */
- static final int QUERY_BATCH_LIMIT = 500;
-
- @Nullable
- private final String projectId;
-
- @Nullable
- private final Query query;
-
- @Nullable
- private final String namespace;
-
- private final int numQuerySplits;
-
- /**
- * Computes the number of splits to be performed on the given query by querying the estimated
- * size from Datastore.
- */
- static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
- int numSplits;
- try {
- long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
- numSplits = (int) Math.min(NUM_QUERY_SPLITS_MAX,
- Math.round(((double) estimatedSizeBytes) / DEFAULT_BUNDLE_SIZE_BYTES));
- } catch (Exception e) {
- LOG.warn("Failed the fetch estimatedSizeBytes for query: {}", query, e);
- // Fallback in case estimated size is unavailable.
- numSplits = NUM_QUERY_SPLITS_MIN;
- }
- return Math.max(numSplits, NUM_QUERY_SPLITS_MIN);
- }
-
- /**
- * Get the estimated size of the data returned by the given query.
- *
- * <p>Datastore provides no way to get a good estimate of how large the result of a query
- * entity kind being queried, using the __Stat_Kind__ system table, assuming exactly 1 kind
- * is specified in the query.
- *
- * <p>See https://cloud.google.com/datastore/docs/concepts/stats.
- */
- static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
- throws DatastoreException {
- String ourKind = query.getKind(0).getName();
- Query.Builder queryBuilder = Query.newBuilder();
- if (namespace == null) {
- queryBuilder.addKindBuilder().setName("__Stat_Kind__");
- } else {
- queryBuilder.addKindBuilder().setName("__Ns_Stat_Kind__");
- }
- queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build()));
-
- // Get the latest statistics
- queryBuilder.addOrder(makeOrder("timestamp", DESCENDING));
- queryBuilder.setLimit(Int32Value.newBuilder().setValue(1));
-
- RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
-
- long now = System.currentTimeMillis();
- RunQueryResponse response = datastore.runQuery(request);
- LOG.debug("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
-
- QueryResultBatch batch = response.getBatch();
- if (batch.getEntityResultsCount() == 0) {
- throw new NoSuchElementException(
- "Datastore statistics for kind " + ourKind + " unavailable");
- }
- Entity entity = batch.getEntityResults(0).getEntity();
- return entity.getProperties().get("entity_bytes").getIntegerValue();
- }
-
- /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */
- static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
- RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
- if (namespace != null) {
- requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
- }
- return requestBuilder.build();
- }
-
- /**
- * A helper function to get the split queries, taking into account the optional
- * {@code namespace}.
- */
- private static List<Query> splitQuery(Query query, @Nullable String namespace,
- Datastore datastore, QuerySplitter querySplitter, int numSplits) throws DatastoreException {
- // If namespace is set, include it in the split request so splits are calculated accordingly.
- PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
- if (namespace != null) {
- partitionBuilder.setNamespaceId(namespace);
- }
-
- return querySplitter.getSplits(query, partitionBuilder.build(), numSplits, datastore);
- }
-
- /**
- * Note that only {@code namespace} is really {@code @Nullable}. The other parameters may be
- * {@code null} as a matter of build order, but if they are {@code null} at instantiation time,
- * an error will be thrown.
- */
- private Read(@Nullable String projectId, @Nullable Query query, @Nullable String namespace,
- int numQuerySplits) {
- this.projectId = projectId;
- this.query = query;
- this.namespace = namespace;
- this.numQuerySplits = numQuerySplits;
- }
-
- /**
- * Returns a new {@link V1Beta3.Read} that reads from the Datastore for the specified project.
- */
- public V1Beta3.Read withProjectId(String projectId) {
- checkNotNull(projectId, "projectId");
- return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
- }
-
- /**
- * Returns a new {@link V1Beta3.Read} that reads the results of the specified query.
- *
- * <p><b>Note:</b> Normally, {@code DatastoreIO} will read from Cloud Datastore in parallel
- * across many workers. However, when the {@link Query} is configured with a limit using
- * {@link Query.Builder#setLimit}, then all results will be read by a single worker in order
- * to ensure correct results.
- */
- public V1Beta3.Read withQuery(Query query) {
- checkNotNull(query, "query");
- checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
- "Invalid query limit %s: must be positive", query.getLimit().getValue());
- return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
- }
-
- /**
- * Returns a new {@link V1Beta3.Read} that reads from the given namespace.
- */
- public V1Beta3.Read withNamespace(String namespace) {
- return new V1Beta3.Read(projectId, query, namespace, numQuerySplits);
- }
-
- /**
- * Returns a new {@link V1Beta3.Read} that reads by splitting the given {@code query} into
- * {@code numQuerySplits}.
- *
- * <p>The semantics for the query splitting is defined below:
- * <ul>
- * <li>Any value less than or equal to 0 will be ignored, and the number of splits will be
- * chosen dynamically at runtime based on the query data size.
- * <li>Any value greater than {@link Read#NUM_QUERY_SPLITS_MAX} will be capped at
- * {@code NUM_QUERY_SPLITS_MAX}.
- * <li>If the {@code query} has a user limit set, then {@code numQuerySplits} will be
- * ignored and no split will be performed.
- * <li>Under certain cases Cloud Datastore is unable to split query to the requested number of
- * splits. In such cases we just use whatever the Datastore returns.
- * </ul>
- */
- public V1Beta3.Read withNumQuerySplits(int numQuerySplits) {
- return new V1Beta3.Read(projectId, query, namespace,
- Math.min(Math.max(numQuerySplits, 0), NUM_QUERY_SPLITS_MAX));
- }
-
- @Nullable
- public Query getQuery() {
- return query;
- }
-
- @Nullable
- public String getProjectId() {
- return projectId;
- }
-
- @Nullable
- public String getNamespace() {
- return namespace;
- }
-
-
- /**
- * {@inheritDoc}
- */
- @Override
- public PCollection<Entity> apply(PBegin input) {
- V1Beta3Options v1Beta3Options = V1Beta3Options.from(getProjectId(), getQuery(),
- getNamespace());
-
- /*
- * This composite transform involves the following steps:
- * 1. Create a singleton of the user provided {@code query} and apply a {@link ParDo} that
- * splits the query into {@code numQuerySplits} and assign each split query a unique
- * {@code Integer} as the key. The resulting output is of the type
- * {@code PCollection<KV<Integer, Query>>}.
- *
- * If the value of {@code numQuerySplits} is less than or equal to 0, then the number of
- * splits will be computed dynamically based on the size of the data for the {@code query}.
- *
- * 2. The resulting {@code PCollection} is sharded using a {@link GroupByKey} operation. The
- * queries are extracted from they {@code KV<Integer, Iterable<Query>>} and flattened to
- * output a {@code PCollection<Query>}.
- *
- * 3. In the third step, a {@code ParDo} reads entities for each query and outputs
- * a {@code PCollection<Entity>}.
- */
- PCollection<KV<Integer, Query>> queries = input
- .apply(Create.of(query))
- .apply(ParDo.of(new SplitQueryFn(v1Beta3Options, numQuerySplits)));
-
- PCollection<Query> shardedQueries = queries
- .apply(GroupByKey.<Integer, Query>create())
- .apply(Values.<Iterable<Query>>create())
- .apply(Flatten.<Query>iterables());
-
- PCollection<Entity> entities = shardedQueries
- .apply(ParDo.of(new ReadFn(v1Beta3Options)));
-
- return entities;
- }
-
- @Override
- public void validate(PBegin input) {
- checkNotNull(projectId, "projectId");
- checkNotNull(query, "query");
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("ProjectId"))
- .addIfNotNull(DisplayData.item("namespace", namespace)
- .withLabel("Namespace"))
- .addIfNotNull(DisplayData.item("query", query.toString())
- .withLabel("Query"));
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("projectId", projectId)
- .add("query", query)
- .add("namespace", namespace)
- .toString();
- }
-
- /**
- * A class for v1beta3 Datastore related options.
- */
- @VisibleForTesting
- static class V1Beta3Options implements Serializable {
- private final Query query;
- private final String projectId;
- @Nullable
- private final String namespace;
-
- private V1Beta3Options(String projectId, Query query, @Nullable String namespace) {
- this.projectId = checkNotNull(projectId, "projectId");
- this.query = checkNotNull(query, "query");
- this.namespace = namespace;
- }
-
- public static V1Beta3Options from(String projectId, Query query, @Nullable String namespace) {
- return new V1Beta3Options(projectId, query, namespace);
- }
-
- public Query getQuery() {
- return query;
- }
-
- public String getProjectId() {
- return projectId;
- }
-
- @Nullable
- public String getNamespace() {
- return namespace;
- }
- }
-
- /**
- * A {@link DoFn} that splits a given query into multiple sub-queries, assigns them unique
- * keys and outputs them as {@link KV}.
- */
- @VisibleForTesting
- static class SplitQueryFn extends DoFn<Query, KV<Integer, Query>> {
- private final V1Beta3Options options;
- // number of splits to make for a given query
- private final int numSplits;
-
- private final V1Beta3DatastoreFactory datastoreFactory;
- // Datastore client
- private transient Datastore datastore;
- // Query splitter
- private transient QuerySplitter querySplitter;
-
- public SplitQueryFn(V1Beta3Options options, int numSplits) {
- this(options, numSplits, new V1Beta3DatastoreFactory());
- }
-
- @VisibleForTesting
- SplitQueryFn(V1Beta3Options options, int numSplits,
- V1Beta3DatastoreFactory datastoreFactory) {
- this.options = options;
- this.numSplits = numSplits;
- this.datastoreFactory = datastoreFactory;
- }
-
- @StartBundle
- public void startBundle(Context c) throws Exception {
- datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.projectId);
- querySplitter = datastoreFactory.getQuerySplitter();
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- int key = 1;
- Query query = c.element();
-
- // If query has a user set limit, then do not split.
- if (query.hasLimit()) {
- c.output(KV.of(key, query));
- return;
- }
-
- int estimatedNumSplits;
- // Compute the estimated numSplits if numSplits is not specified by the user.
- if (numSplits <= 0) {
- estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace());
- } else {
- estimatedNumSplits = numSplits;
- }
-
- List<Query> querySplits;
- try {
- querySplits = splitQuery(query, options.getNamespace(), datastore, querySplitter,
- estimatedNumSplits);
- } catch (Exception e) {
- LOG.warn("Unable to parallelize the given query: {}", query, e);
- querySplits = ImmutableList.of(query);
- }
-
- // assign unique keys to query splits.
- for (Query subquery : querySplits) {
- c.output(KV.of(key++, subquery));
- }
- }
-
- @Override
- public void populateDisplayData(Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", options.getProjectId())
- .withLabel("ProjectId"))
- .addIfNotNull(DisplayData.item("namespace", options.getNamespace())
- .withLabel("Namespace"))
- .addIfNotNull(DisplayData.item("query", options.getQuery().toString())
- .withLabel("Query"));
- }
- }
-
- /**
- * A {@link DoFn} that reads entities from Datastore for each query.
- */
- @VisibleForTesting
- static class ReadFn extends DoFn<Query, Entity> {
- private final V1Beta3Options options;
- private final V1Beta3DatastoreFactory datastoreFactory;
- // Datastore client
- private transient Datastore datastore;
-
- public ReadFn(V1Beta3Options options) {
- this(options, new V1Beta3DatastoreFactory());
- }
-
- @VisibleForTesting
- ReadFn(V1Beta3Options options, V1Beta3DatastoreFactory datastoreFactory) {
- this.options = options;
- this.datastoreFactory = datastoreFactory;
- }
-
- @StartBundle
- public void startBundle(Context c) throws Exception {
- datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), options.getProjectId());
- }
-
- /** Read and output entities for the given query. */
- @ProcessElement
- public void processElement(ProcessContext context) throws Exception {
- Query query = context.element();
- String namespace = options.getNamespace();
- int userLimit = query.hasLimit()
- ? query.getLimit().getValue() : Integer.MAX_VALUE;
-
- boolean moreResults = true;
- QueryResultBatch currentBatch = null;
-
- while (moreResults) {
- Query.Builder queryBuilder = query.toBuilder().clone();
- queryBuilder.setLimit(Int32Value.newBuilder().setValue(
- Math.min(userLimit, QUERY_BATCH_LIMIT)));
-
- if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
- queryBuilder.setStartCursor(currentBatch.getEndCursor());
- }
-
- RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
- RunQueryResponse response = datastore.runQuery(request);
-
- currentBatch = response.getBatch();
-
- // MORE_RESULTS_AFTER_LIMIT is not implemented yet:
- // https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
- // use result count to determine if more results might exist.
- int numFetch = currentBatch.getEntityResultsCount();
- if (query.hasLimit()) {
- verify(userLimit >= numFetch,
- "Expected userLimit %s >= numFetch %s, because query limit %s must be <= userLimit",
- userLimit, numFetch, query.getLimit());
- userLimit -= numFetch;
- }
-
- // output all the entities from the current batch.
- for (EntityResult entityResult : currentBatch.getEntityResultsList()) {
- context.output(entityResult.getEntity());
- }
-
- // Check if we have more entities to be read.
- moreResults =
- // User-limit does not exist (so userLimit == MAX_VALUE) and/or has not been satisfied
- (userLimit > 0)
- // All indications from the API are that there are/may be more results.
- && ((numFetch == QUERY_BATCH_LIMIT)
- || (currentBatch.getMoreResults() == NOT_FINISHED));
- }
- }
- }
- }
-
- /**
- * Returns an empty {@link V1Beta3.Write} builder. Configure the destination
- * {@code projectId} using {@link V1Beta3.Write#withProjectId}.
- */
- public Write write() {
- return new Write(null);
- }
-
- /**
- * Returns an empty {@link DeleteEntity} builder. Configure the destination
- * {@code projectId} using {@link DeleteEntity#withProjectId}.
- */
- public DeleteEntity deleteEntity() {
- return new DeleteEntity(null);
- }
-
- /**
- * Returns an empty {@link DeleteKey} builder. Configure the destination
- * {@code projectId} using {@link DeleteKey#withProjectId}.
- */
- public DeleteKey deleteKey() {
- return new DeleteKey(null);
- }
-
- /**
- * A {@link PTransform} that writes {@link Entity} objects to Cloud Datastore.
- *
- * @see DatastoreIO
- */
- public static class Write extends Mutate<Entity> {
- /**
- * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
- * it is {@code null} at instantiation time, an error will be thrown.
- */
- Write(@Nullable String projectId) {
- super(projectId, new UpsertFn());
- }
-
- /**
- * Returns a new {@link Write} that writes to the Cloud Datastore for the specified project.
- */
- public Write withProjectId(String projectId) {
- checkNotNull(projectId, "projectId");
- return new Write(projectId);
- }
- }
-
- /**
- * A {@link PTransform} that deletes {@link Entity Entities} from Cloud Datastore.
- *
- * @see DatastoreIO
- */
- public static class DeleteEntity extends Mutate<Entity> {
- /**
- * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
- * it is {@code null} at instantiation time, an error will be thrown.
- */
- DeleteEntity(@Nullable String projectId) {
- super(projectId, new DeleteEntityFn());
- }
-
- /**
- * Returns a new {@link DeleteEntity} that deletes entities from the Cloud Datastore for the
- * specified project.
- */
- public DeleteEntity withProjectId(String projectId) {
- checkNotNull(projectId, "projectId");
- return new DeleteEntity(projectId);
- }
- }
-
- /**
- * A {@link PTransform} that deletes {@link Entity Entities} associated with the given
- * {@link Key Keys} from Cloud Datastore.
- *
- * @see DatastoreIO
- */
- public static class DeleteKey extends Mutate<Key> {
- /**
- * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
- * it is {@code null} at instantiation time, an error will be thrown.
- */
- DeleteKey(@Nullable String projectId) {
- super(projectId, new DeleteKeyFn());
- }
-
- /**
- * Returns a new {@link DeleteKey} that deletes entities from the Cloud Datastore for the
- * specified project.
- */
- public DeleteKey withProjectId(String projectId) {
- checkNotNull(projectId, "projectId");
- return new DeleteKey(projectId);
- }
- }
-
- /**
- * A {@link PTransform} that writes mutations to Cloud Datastore.
- *
- * <p>It requires a {@link DoFn} that tranforms an object of type {@code T} to a {@link Mutation}.
- * {@code T} is usually either an {@link Entity} or a {@link Key}
- * <b>Note:</b> Only idempotent Cloud Datastore mutation operations (upsert and delete) should
- * be used by the {@code DoFn} provided, as the commits are retried when failures occur.
- */
- private abstract static class Mutate<T> extends PTransform<PCollection<T>, PDone> {
- @Nullable
- private final String projectId;
- /** A function that transforms each {@code T} into a mutation. */
- private final SimpleFunction<T, Mutation> mutationFn;
-
- /**
- * Note that {@code projectId} is only {@code @Nullable} as a matter of build order, but if
- * it is {@code null} at instantiation time, an error will be thrown.
- */
- Mutate(@Nullable String projectId, SimpleFunction<T, Mutation> mutationFn) {
- this.projectId = projectId;
- this.mutationFn = checkNotNull(mutationFn);
- }
-
- @Override
- public PDone apply(PCollection<T> input) {
- input.apply("Convert to Mutation", MapElements.via(mutationFn))
- .apply("Write Mutation to Datastore", ParDo.of(new DatastoreWriterFn(projectId)));
-
- return PDone.in(input.getPipeline());
- }
-
- @Override
- public void validate(PCollection<T> input) {
- checkNotNull(projectId, "projectId");
- checkNotNull(mutationFn, "mutationFn");
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("projectId", projectId)
- .add("mutationFn", mutationFn.getClass().getName())
- .toString();
- }
-
- @Override
- public void populateDisplayData(DisplayData.Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("Output Project"))
- .include(mutationFn);
- }
-
- public String getProjectId() {
- return projectId;
- }
- }
-
- /**
- * {@link DoFn} that writes {@link Mutation}s to Cloud Datastore. Mutations are written in
- * batches, where the maximum batch size is {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}.
- *
- * <p>See <a
- * href="https://cloud.google.com/datastore/docs/concepts/entities">
- * Datastore: Entities, Properties, and Keys</a> for information about entity keys and mutations.
- *
- * <p>Commits are non-transactional. If a commit fails because of a conflict over an entity
- * group, the commit will be retried (up to {@link V1Beta3#DATASTORE_BATCH_UPDATE_LIMIT}
- * times). This means that the mutation operation should be idempotent. Thus, the writer should
- * only be used for {code upsert} and {@code delete} mutation operations, as these are the only
- * two Cloud Datastore mutations that are idempotent.
- */
- @VisibleForTesting
- static class DatastoreWriterFn extends DoFn<Mutation, Void> {
- private static final Logger LOG = LoggerFactory.getLogger(DatastoreWriterFn.class);
- private final String projectId;
- private transient Datastore datastore;
- private final V1Beta3DatastoreFactory datastoreFactory;
- // Current batch of mutations to be written.
- private final List<Mutation> mutations = new ArrayList<>();
- /**
- * Since a bundle is written in batches, we should retry the commit of a batch in order to
- * prevent transient errors from causing the bundle to fail.
- */
- private static final int MAX_RETRIES = 5;
-
- /**
- * Initial backoff time for exponential backoff for retry attempts.
- */
- private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
- DatastoreWriterFn(String projectId) {
- this(projectId, new V1Beta3DatastoreFactory());
- }
-
- @VisibleForTesting
- DatastoreWriterFn(String projectId, V1Beta3DatastoreFactory datastoreFactory) {
- this.projectId = checkNotNull(projectId, "projectId");
- this.datastoreFactory = datastoreFactory;
- }
-
- @StartBundle
- public void startBundle(Context c) {
- datastore = datastoreFactory.getDatastore(c.getPipelineOptions(), projectId);
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- mutations.add(c.element());
- if (mutations.size() >= V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT) {
- flushBatch();
- }
- }
-
- @FinishBundle
- public void finishBundle(Context c) throws Exception {
- if (mutations.size() > 0) {
- flushBatch();
- }
- }
-
- /**
- * Writes a batch of mutations to Cloud Datastore.
- *
- * <p>If a commit fails, it will be retried (up to {@link DatastoreWriterFn#MAX_RETRIES}
- * times). All mutations in the batch will be committed again, even if the commit was partially
- * successful. If the retry limit is exceeded, the last exception from the Datastore will be
- * thrown.
- *
- * @throws DatastoreException if the commit fails or IOException or InterruptedException if
- * backing off between retries fails.
- */
- private void flushBatch() throws DatastoreException, IOException, InterruptedException {
- LOG.debug("Writing batch of {} mutations", mutations.size());
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
- while (true) {
- // Batch upsert entities.
- try {
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- commitRequest.addAllMutations(mutations);
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- datastore.commit(commitRequest.build());
- // Break if the commit threw no exception.
- break;
- } catch (DatastoreException exception) {
- // Only log the code and message for potentially-transient errors. The entire exception
- // will be propagated upon the last retry.
- LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
- exception.getMessage());
- if (!BackOffUtils.next(sleeper, backoff)) {
- LOG.error("Aborting after {} retries.", MAX_RETRIES);
- throw exception;
- }
- }
- }
- LOG.debug("Successfully wrote {} mutations", mutations.size());
- mutations.clear();
- }
-
- @Override
- public void populateDisplayData(Builder builder) {
- super.populateDisplayData(builder);
- builder
- .addIfNotNull(DisplayData.item("projectId", projectId)
- .withLabel("Output Project"));
- }
- }
-
- /**
- * Returns true if a Datastore key is complete. A key is complete if its last element
- * has either an id or a name.
- */
- static boolean isValidKey(Key key) {
- List<PathElement> elementList = key.getPathList();
- if (elementList.isEmpty()) {
- return false;
- }
- PathElement lastElement = elementList.get(elementList.size() - 1);
- return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
- }
-
- /**
- * A function that constructs an upsert {@link Mutation} from an {@link Entity}.
- */
- @VisibleForTesting
- static class UpsertFn extends SimpleFunction<Entity, Mutation> {
- @Override
- public Mutation apply(Entity entity) {
- // Verify that the entity to write has a complete key.
- checkArgument(isValidKey(entity.getKey()),
- "Entities to be written to the Datastore must have complete keys:\n%s", entity);
-
- return makeUpsert(entity).build();
- }
-
- @Override
- public void populateDisplayData(Builder builder) {
- builder.add(DisplayData.item("upsertFn", this.getClass())
- .withLabel("Create Upsert Mutation"));
- }
- }
-
- /**
- * A function that constructs a delete {@link Mutation} from an {@link Entity}.
- */
- @VisibleForTesting
- static class DeleteEntityFn extends SimpleFunction<Entity, Mutation> {
- @Override
- public Mutation apply(Entity entity) {
- // Verify that the entity to delete has a complete key.
- checkArgument(isValidKey(entity.getKey()),
- "Entities to be deleted from the Datastore must have complete keys:\n%s", entity);
-
- return makeDelete(entity.getKey()).build();
- }
-
- @Override
- public void populateDisplayData(Builder builder) {
- builder.add(DisplayData.item("deleteEntityFn", this.getClass())
- .withLabel("Create Delete Mutation"));
- }
- }
-
- /**
- * A function that constructs a delete {@link Mutation} from a {@link Key}.
- */
- @VisibleForTesting
- static class DeleteKeyFn extends SimpleFunction<Key, Mutation> {
- @Override
- public Mutation apply(Key key) {
- // Verify that the entity to delete has a complete key.
- checkArgument(isValidKey(key),
- "Keys to be deleted from the Datastore must be complete:\n%s", key);
-
- return makeDelete(key).build();
- }
-
- @Override
- public void populateDisplayData(Builder builder) {
- builder.add(DisplayData.item("deleteKeyFn", this.getClass())
- .withLabel("Create Delete Mutation"));
- }
- }
-
- /**
- * A wrapper factory class for Datastore singleton classes {@link DatastoreFactory} and
- * {@link QuerySplitter}
- *
- * <p>{@link DatastoreFactory} and {@link QuerySplitter} are not java serializable, hence
- * wrapping them under this class, which implements {@link Serializable}.
- */
- @VisibleForTesting
- static class V1Beta3DatastoreFactory implements Serializable {
-
- /** Builds a Datastore client for the given pipeline options and project. */
- public Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
- DatastoreOptions.Builder builder =
- new DatastoreOptions.Builder()
- .projectId(projectId)
- .initializer(
- new RetryHttpRequestInitializer()
- );
-
- Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
- if (credential != null) {
- builder.credential(credential);
- }
-
- return DatastoreFactory.get().create(builder.build());
- }
-
- /** Builds a Datastore {@link QuerySplitter}. */
- public QuerySplitter getQuerySplitter() {
- return DatastoreHelper.getQuerySplitter();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
new file mode 100644
index 0000000..31b5da4
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -0,0 +1,792 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DATASTORE_BATCH_UPDATE_LIMIT;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.DEFAULT_BUNDLE_SIZE_BYTES;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.QUERY_BATCH_LIMIT;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.getEstimatedSizeBytes;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.makeRequest;
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.isValidKey;
+import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.verifyZeroInteractions;
+import static org.mockito.Mockito.when;
+
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DatastoreWriterFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntity;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteEntityFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteKey;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.DeleteKeyFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.ReadFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.UpsertFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.V1DatastoreFactory;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Write;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.RunnableOnService;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Mutation;
+import com.google.datastore.v1.PartitionId;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.RunQueryRequest;
+import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
+
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Tests for {@link DatastoreV1}.
+ */
+@RunWith(JUnit4.class)
+public class DatastoreV1Test {
+ private static final String PROJECT_ID = "testProject";
+ private static final String NAMESPACE = "testNamespace";
+ private static final String KIND = "testKind";
+ private static final Query QUERY;
+ private static final V1Options V_1_OPTIONS;
+ static {
+ Query.Builder q = Query.newBuilder();
+ q.addKindBuilder().setName(KIND);
+ QUERY = q.build();
+ V_1_OPTIONS = V1Options.from(PROJECT_ID, QUERY, NAMESPACE);
+ }
+ private DatastoreV1.Read initialRead;
+
+ @Mock
+ Datastore mockDatastore;
+ @Mock
+ QuerySplitter mockQuerySplitter;
+ @Mock
+ V1DatastoreFactory mockDatastoreFactory;
+
+ @Rule
+ public final ExpectedException thrown = ExpectedException.none();
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+
+ initialRead = DatastoreIO.v1().read()
+ .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+
+ when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class)))
+ .thenReturn(mockDatastore);
+ when(mockDatastoreFactory.getQuerySplitter())
+ .thenReturn(mockQuerySplitter);
+ }
+
+ @Test
+ public void testBuildRead() throws Exception {
+ DatastoreV1.Read read = DatastoreIO.v1().read()
+ .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+ assertEquals(QUERY, read.getQuery());
+ assertEquals(PROJECT_ID, read.getProjectId());
+ assertEquals(NAMESPACE, read.getNamespace());
+ }
+
+ /**
+ * {@link #testBuildRead} but constructed in a different order.
+ */
+ @Test
+ public void testBuildReadAlt() throws Exception {
+ DatastoreV1.Read read = DatastoreIO.v1().read()
+ .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
+ assertEquals(QUERY, read.getQuery());
+ assertEquals(PROJECT_ID, read.getProjectId());
+ assertEquals(NAMESPACE, read.getNamespace());
+ }
+
+ @Test
+ public void testReadValidationFailsProject() throws Exception {
+ DatastoreV1.Read read = DatastoreIO.v1().read().withQuery(QUERY);
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("project");
+ read.validate(null);
+ }
+
+ @Test
+ public void testReadValidationFailsQuery() throws Exception {
+ DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID);
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("query");
+ read.validate(null);
+ }
+
+ @Test
+ public void testReadValidationFailsQueryLimitZero() throws Exception {
+ Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid query limit 0: must be positive");
+
+ DatastoreIO.v1().read().withQuery(invalidLimit);
+ }
+
+ @Test
+ public void testReadValidationFailsQueryLimitNegative() throws Exception {
+ Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid query limit -5: must be positive");
+
+ DatastoreIO.v1().read().withQuery(invalidLimit);
+ }
+
+ @Test
+ public void testReadValidationSucceedsNamespace() throws Exception {
+ DatastoreV1.Read read = DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY);
+ /* Should succeed, as a null namespace is fine. */
+ read.validate(null);
+ }
+
+ @Test
+ public void testReadDisplayData() {
+ DatastoreV1.Read read = DatastoreIO.v1().read()
+ .withProjectId(PROJECT_ID)
+ .withQuery(QUERY)
+ .withNamespace(NAMESPACE);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
+ assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testSourcePrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1().read().withProjectId(
+ "myProject").withQuery(Query.newBuilder().build());
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+ assertThat("DatastoreIO read should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("projectId")));
+ }
+
+ @Test
+ public void testWriteDoesNotAllowNullProject() throws Exception {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ DatastoreIO.v1().write().withProjectId(null);
+ }
+
+ @Test
+ public void testWriteValidationFailsWithNoProject() throws Exception {
+ Write write = DatastoreIO.v1().write();
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ write.validate(null);
+ }
+
+ @Test
+ public void testWriteValidationSucceedsWithProject() throws Exception {
+ Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+ write.validate(null);
+ }
+
+ @Test
+ public void testWriteDisplayData() {
+ Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+
+ DisplayData displayData = DisplayData.from(write);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
+
+ @Test
+ public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ DatastoreIO.v1().deleteEntity().withProjectId(null);
+ }
+
+ @Test
+ public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
+ DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity();
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ deleteEntity.validate(null);
+ }
+
+ @Test
+ public void testDeleteEntityValidationSucceedsWithProject() throws Exception {
+ DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
+ deleteEntity.validate(null);
+ }
+
+ @Test
+ public void testDeleteEntityDisplayData() {
+ DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
+
+ DisplayData displayData = DisplayData.from(deleteEntity);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
+
+ @Test
+ public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ DatastoreIO.v1().deleteKey().withProjectId(null);
+ }
+
+ @Test
+ public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
+ DeleteKey deleteKey = DatastoreIO.v1().deleteKey();
+
+ thrown.expect(NullPointerException.class);
+ thrown.expectMessage("projectId");
+
+ deleteKey.validate(null);
+ }
+
+ @Test
+ public void testDeleteKeyValidationSucceedsWithProject() throws Exception {
+ DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
+ deleteKey.validate(null);
+ }
+
+ @Test
+ public void testDeleteKeyDisplayData() {
+ DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
+
+ DisplayData displayData = DisplayData.from(deleteKey);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testWritePrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Entity>, ?> write =
+ DatastoreIO.v1().write().withProjectId("myProject");
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat("DatastoreIO write should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("projectId")));
+ assertThat("DatastoreIO write should include the upsertFn in its primitive display data",
+ displayData, hasItem(hasDisplayItem("upsertFn")));
+
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDeleteEntityPrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Entity>, ?> write =
+ DatastoreIO.v1().deleteEntity().withProjectId("myProject");
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat("DatastoreIO write should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("projectId")));
+ assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data",
+ displayData, hasItem(hasDisplayItem("deleteEntityFn")));
+
+ }
+
+ @Test
+ @Category(RunnableOnService.class)
+ public void testDeleteKeyPrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Key>, ?> write =
+ DatastoreIO.v1().deleteKey().withProjectId("myProject");
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat("DatastoreIO write should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("projectId")));
+ assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data",
+ displayData, hasItem(hasDisplayItem("deleteKeyFn")));
+
+ }
+
+ /**
+ * Test building a Write using builder methods.
+ */
+ @Test
+ public void testBuildWrite() throws Exception {
+ DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+ assertEquals(PROJECT_ID, write.getProjectId());
+ }
+
+ /**
+ * Test the detection of complete and incomplete keys.
+ */
+ @Test
+ public void testHasNameOrId() {
+ Key key;
+ // Complete with name, no ancestor
+ key = makeKey("bird", "finch").build();
+ assertTrue(isValidKey(key));
+
+ // Complete with id, no ancestor
+ key = makeKey("bird", 123).build();
+ assertTrue(isValidKey(key));
+
+ // Incomplete, no ancestor
+ key = makeKey("bird").build();
+ assertFalse(isValidKey(key));
+
+ // Complete with name and ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird", "horned").build();
+ assertTrue(isValidKey(key));
+
+ // Complete with id and ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird", 123).build();
+ assertTrue(isValidKey(key));
+
+ // Incomplete with ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird").build();
+ assertFalse(isValidKey(key));
+
+ key = makeKey().build();
+ assertFalse(isValidKey(key));
+ }
+
+ /**
+ * Test that entities with incomplete keys cannot be updated.
+ */
+ @Test
+ public void testAddEntitiesWithIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ UpsertFn upsertFn = new UpsertFn();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
+
+ upsertFn.apply(entity);
+ }
+
+ @Test
+ /**
+ * Test that entities with valid keys are transformed to upsert mutations.
+ */
+ public void testAddEntities() throws Exception {
+ Key key = makeKey("bird", "finch").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ UpsertFn upsertFn = new UpsertFn();
+
+ Mutation exceptedMutation = makeUpsert(entity).build();
+ assertEquals(upsertFn.apply(entity), exceptedMutation);
+ }
+
+ /**
+ * Test that entities with incomplete keys cannot be deleted.
+ */
+ @Test
+ public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys");
+
+ deleteEntityFn.apply(entity);
+ }
+
+ /**
+ * Test that entities with valid keys are transformed to delete mutations.
+ */
+ @Test
+ public void testDeleteEntities() throws Exception {
+ Key key = makeKey("bird", "finch").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+
+ Mutation exceptedMutation = makeDelete(entity.getKey()).build();
+ assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
+ }
+
+ /**
+ * Test that incomplete keys cannot be deleted.
+ */
+ @Test
+ public void testDeleteIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Keys to be deleted from the Datastore must be complete");
+
+ deleteKeyFn.apply(key);
+ }
+
+ /**
+ * Test that valid keys are transformed to delete mutations.
+ */
+ @Test
+ public void testDeleteKeys() throws Exception {
+ Key key = makeKey("bird", "finch").build();
+ DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+
+ Mutation exceptedMutation = makeDelete(key).build();
+ assertEquals(deleteKeyFn.apply(key), exceptedMutation);
+ }
+
+ @Test
+ public void testDatastoreWriteFnDisplayData() {
+ DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
+ DisplayData displayData = DisplayData.from(datastoreWriter);
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
+
+ /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
+ @Test
+ public void testDatatoreWriterFnWithOneBatch() throws Exception {
+ datastoreWriterFnTest(100);
+ }
+
+ /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
+ @Test
+ public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
+ datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
+ }
+
+ /**
+ * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
+ * write batch size.
+ */
+ @Test
+ public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
+ datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
+ }
+
+ // A helper method to test DatastoreWriterFn for various batch sizes.
+ private void datastoreWriterFnTest(int numMutations) throws Exception {
+ // Create the requested number of mutations.
+ List<Mutation> mutations = new ArrayList<>(numMutations);
+ for (int i = 0; i < numMutations; ++i) {
+ mutations.add(
+ makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+ }
+
+ DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
+ DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(mutations);
+
+ int start = 0;
+ while (start < numMutations) {
+ int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ commitRequest.addAllMutations(mutations.subList(start, end));
+ // Verify all the batch requests were made with the expected mutations.
+ verify(mockDatastore, times(1)).commit(commitRequest.build());
+ start = end;
+ }
+ }
+
+ /**
+ * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
+ * query.
+ */
+ @Test
+ public void testEstimatedSizeBytes() throws Exception {
+ long entityBytes = 100L;
+ // Per Kind statistics request and response
+ RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
+ RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+ when(mockDatastore.runQuery(statRequest))
+ .thenReturn(statResponse);
+
+ assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
+ verify(mockDatastore, times(1)).runQuery(statRequest);
+ }
+
+ /**
+ * Tests {@link SplitQueryFn} when number of query splits is specified.
+ */
+ @Test
+ public void testSplitQueryFnWithNumSplits() throws Exception {
+ int numSplits = 100;
+ when(mockQuerySplitter.getSplits(
+ eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
+ .thenReturn(splitQuery(QUERY, numSplits));
+
+ SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
+ DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
+ /**
+ * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
+ * mock factory using a when clause for unit testing purposes, it is not serializable
+ * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
+ * doFn from being serialized.
+ */
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
+
+ assertEquals(queries.size(), numSplits);
+ verifyUniqueKeys(queries);
+ verify(mockQuerySplitter, times(1)).getSplits(
+ eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
+ verifyZeroInteractions(mockDatastore);
+ }
+
+ /**
+ * Tests {@link SplitQueryFn} when no query splits is specified.
+ */
+ @Test
+ public void testSplitQueryFnWithoutNumSplits() throws Exception {
+ // Force SplitQueryFn to compute the number of query splits
+ int numSplits = 0;
+ int expectedNumSplits = 20;
+ long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
+
+ // Per Kind statistics request and response
+ RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
+ RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+ when(mockDatastore.runQuery(statRequest))
+ .thenReturn(statResponse);
+ when(mockQuerySplitter.getSplits(
+ eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
+ .thenReturn(splitQuery(QUERY, expectedNumSplits));
+
+ SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
+ DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
+
+ assertEquals(queries.size(), expectedNumSplits);
+ verifyUniqueKeys(queries);
+ verify(mockQuerySplitter, times(1)).getSplits(
+ eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
+ verify(mockDatastore, times(1)).runQuery(statRequest);
+ }
+
+ /**
+ * Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit.
+ */
+ @Test
+ public void testSplitQueryFnWithQueryLimit() throws Exception {
+ Query queryWithLimit = QUERY.toBuilder().clone()
+ .setLimit(Int32Value.newBuilder().setValue(1))
+ .build();
+
+ SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory);
+ DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit);
+
+ assertEquals(queries.size(), 1);
+ verifyUniqueKeys(queries);
+ verifyNoMoreInteractions(mockDatastore);
+ verifyNoMoreInteractions(mockQuerySplitter);
+ }
+
+ /** Tests {@link ReadFn} with a query limit less than one batch. */
+ @Test
+ public void testReadFnWithOneBatch() throws Exception {
+ readFnTest(5);
+ }
+
+ /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
+ @Test
+ public void testReadFnWithMultipleBatches() throws Exception {
+ readFnTest(QUERY_BATCH_LIMIT + 5);
+ }
+
+ /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
+ @Test
+ public void testReadFnWithBatchesExactMultiple() throws Exception {
+ readFnTest(5 * QUERY_BATCH_LIMIT);
+ }
+
+ /** Helper Methods */
+
+ /** A helper function that verifies if all the queries have unique keys. */
+ private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
+ Set<Integer> keys = new HashSet<>();
+ for (KV<Integer, Query> kv: queries) {
+ keys.add(kv.getKey());
+ }
+ assertEquals(keys.size(), queries.size());
+ }
+
+ /**
+ * A helper function that creates mock {@link Entity} results in response to a query. Always
+ * indicates that more results are available, unless the batch is limited to fewer than
+ * {@link DatastoreV1.Read#QUERY_BATCH_LIMIT} results.
+ */
+ private static RunQueryResponse mockResponseForQuery(Query q) {
+ // Every query DatastoreV1 sends should have a limit.
+ assertTrue(q.hasLimit());
+
+ // The limit should be in the range [1, QUERY_BATCH_LIMIT]
+ int limit = q.getLimit().getValue();
+ assertThat(limit, greaterThanOrEqualTo(1));
+ assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT));
+
+ // Create the requested number of entities.
+ List<EntityResult> entities = new ArrayList<>(limit);
+ for (int i = 0; i < limit; ++i) {
+ entities.add(
+ EntityResult.newBuilder()
+ .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
+ .build());
+ }
+
+ // Fill out the other parameters on the returned result batch.
+ RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
+ ret.getBatchBuilder()
+ .addAllEntityResults(entities)
+ .setEntityResultType(EntityResult.ResultType.FULL)
+ .setMoreResults(
+ limit == QUERY_BATCH_LIMIT
+ ? QueryResultBatch.MoreResultsType.NOT_FINISHED
+ : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
+
+ return ret.build();
+ }
+
+ /** Helper function to run a test reading from a {@link ReadFn}. */
+ private void readFnTest(int numEntities) throws Exception {
+ // An empty query to read entities.
+ Query query = Query.newBuilder().setLimit(
+ Int32Value.newBuilder().setValue(numEntities)).build();
+
+ // Use mockResponseForQuery to generate results.
+ when(mockDatastore.runQuery(any(RunQueryRequest.class)))
+ .thenAnswer(new Answer<RunQueryResponse>() {
+ @Override
+ public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
+ Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
+ return mockResponseForQuery(q);
+ }
+ });
+
+ ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
+ DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
+ /**
+ * Although Datastore client is marked transient in {@link ReadFn}, when injected through
+ * mock factory using a when clause for unit testing purposes, it is not serializable
+ * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
+ * test object from being serialized.
+ */
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ List<Entity> entities = doFnTester.processBundle(query);
+
+ int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
+ verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
+ // Validate the number of results.
+ assertEquals(numEntities, entities.size());
+ }
+
+ /** Builds a per-kind statistics response with the given entity size. */
+ private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
+ RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
+ Entity.Builder entity = Entity.newBuilder();
+ entity.setKey(makeKey("dummyKind", "dummyId"));
+ entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build());
+ EntityResult.Builder entityResult = EntityResult.newBuilder();
+ entityResult.setEntity(entity);
+ QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
+ batch.addEntityResults(entityResult);
+ timestampResponse.setBatch(batch);
+ return timestampResponse.build();
+ }
+
+ /** Builds a per-kind statistics query for the given timestamp and namespace. */
+ private static Query makeStatKindQuery(String namespace) {
+ Query.Builder statQuery = Query.newBuilder();
+ if (namespace == null) {
+ statQuery.addKindBuilder().setName("__Stat_Kind__");
+ } else {
+ statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
+ }
+ statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
+ statQuery.addOrder(makeOrder("timestamp", DESCENDING));
+ statQuery.setLimit(Int32Value.newBuilder().setValue(1));
+ return statQuery.build();
+ }
+
+ /** Generate dummy query splits. */
+ private List<Query> splitQuery(Query query, int numSplits) {
+ List<Query> queries = new LinkedList<>();
+ for (int i = 0; i < numSplits; i++) {
+ queries.add(query.toBuilder().clone().build());
+ }
+ return queries;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java
deleted file mode 100644
index ddb6d81..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3ReadIT.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.deleteAllEntities;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.getDatastore;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.makeAncestorKey;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.makeEntity;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.UpsertMutationBuilder;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.V1Beta3TestWriter;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.PAssert;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.values.PCollection;
-
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.client.Datastore;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.UUID;
-
-/**
- * End-to-end tests for Datastore V1Beta3.Read.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3ReadIT {
- private V1Beta3TestOptions options;
- private String ancestor;
- private final long numEntities = 1000;
-
- @Before
- public void setup() {
- PipelineOptionsFactory.register(V1Beta3TestOptions.class);
- options = TestPipeline.testingPipelineOptions().as(V1Beta3TestOptions.class);
- ancestor = UUID.randomUUID().toString();
- }
-
- /**
- * An end-to-end test for {@link V1Beta3.Read}.
- *
- * Write some test entities to datastore and then run a dataflow pipeline that
- * reads and counts the total number of entities. Verify that the count matches the
- * number of entities written.
- */
- @Test
- public void testE2EV1Beta3Read() throws Exception {
- // Create entities and write them to datastore
- writeEntitiesToDatastore(options, ancestor, numEntities);
-
- // Read from datastore
- Query query = V1Beta3TestUtil.makeAncestorKindQuery(
- options.getKind(), options.getNamespace(), ancestor);
-
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(options.getProject())
- .withQuery(query)
- .withNamespace(options.getNamespace());
-
- // Count the total number of entities
- Pipeline p = Pipeline.create(options);
- PCollection<Long> count = p
- .apply(read)
- .apply(Count.<Entity>globally());
-
- PAssert.thatSingleton(count).isEqualTo(numEntities);
- p.run();
- }
-
- // Creates entities and write them to datastore
- private static void writeEntitiesToDatastore(V1Beta3TestOptions options, String ancestor,
- long numEntities) throws Exception {
- Datastore datastore = getDatastore(options, options.getProject());
- // Write test entities to datastore
- V1Beta3TestWriter writer = new V1Beta3TestWriter(datastore, new UpsertMutationBuilder());
- Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
-
- for (long i = 0; i < numEntities; i++) {
- Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
- writer.write(entity);
- }
- writer.close();
- }
-
- @After
- public void tearDown() throws Exception {
- deleteAllEntities(options, ancestor);
- }
-}
[2/4] incubator-beam git commit: DatastoreIO v1beta3 to v1
Posted by dh...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
deleted file mode 100644
index b0c6c18..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3Test.java
+++ /dev/null
@@ -1,792 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DATASTORE_BATCH_UPDATE_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.DEFAULT_BUNDLE_SIZE_BYTES;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.QUERY_BATCH_LIMIT;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.getEstimatedSizeBytes;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.makeRequest;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3.isValidKey;
-import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
-import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.hamcrest.Matchers.hasItem;
-import static org.hamcrest.Matchers.lessThanOrEqualTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.verifyZeroInteractions;
-import static org.mockito.Mockito.when;
-
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DatastoreWriterFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntity;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteEntityFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKey;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.DeleteKeyFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.ReadFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.SplitQueryFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Read.V1Beta3Options;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.UpsertFn;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.V1Beta3DatastoreFactory;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3.Write;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.testing.RunnableOnService;
-import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.DoFnTester.CloningBehavior;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
-import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.POutput;
-
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PartitionId;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.QuerySplitter;
-import com.google.protobuf.Int32Value;
-
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-import org.mockito.Mock;
-import org.mockito.MockitoAnnotations;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Tests for {@link V1Beta3}.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3Test {
- private static final String PROJECT_ID = "testProject";
- private static final String NAMESPACE = "testNamespace";
- private static final String KIND = "testKind";
- private static final Query QUERY;
- private static final V1Beta3Options v1Beta3Options;
- static {
- Query.Builder q = Query.newBuilder();
- q.addKindBuilder().setName(KIND);
- QUERY = q.build();
- v1Beta3Options = V1Beta3Options.from(PROJECT_ID, QUERY, NAMESPACE);
- }
- private V1Beta3.Read initialRead;
-
- @Mock
- Datastore mockDatastore;
- @Mock
- QuerySplitter mockQuerySplitter;
- @Mock
- V1Beta3DatastoreFactory mockDatastoreFactory;
-
- @Rule
- public final ExpectedException thrown = ExpectedException.none();
-
- @Before
- public void setUp() {
- MockitoAnnotations.initMocks(this);
-
- initialRead = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-
- when(mockDatastoreFactory.getDatastore(any(PipelineOptions.class), any(String.class)))
- .thenReturn(mockDatastore);
- when(mockDatastoreFactory.getQuerySplitter())
- .thenReturn(mockQuerySplitter);
- }
-
- @Test
- public void testBuildRead() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId());
- assertEquals(NAMESPACE, read.getNamespace());
- }
-
- /**
- * {@link #testBuildRead} but constructed in a different order.
- */
- @Test
- public void testBuildReadAlt() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID).withNamespace(NAMESPACE).withQuery(QUERY);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId());
- assertEquals(NAMESPACE, read.getNamespace());
- }
-
- @Test
- public void testReadValidationFailsProject() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withQuery(QUERY);
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("project");
- read.validate(null);
- }
-
- @Test
- public void testReadValidationFailsQuery() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID);
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("query");
- read.validate(null);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitZero() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit 0: must be positive");
-
- DatastoreIO.v1beta3().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitNegative() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit -5: must be positive");
-
- DatastoreIO.v1beta3().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadValidationSucceedsNamespace() throws Exception {
- V1Beta3.Read read = DatastoreIO.v1beta3().read().withProjectId(PROJECT_ID).withQuery(QUERY);
- /* Should succeed, as a null namespace is fine. */
- read.validate(null);
- }
-
- @Test
- public void testReadDisplayData() {
- V1Beta3.Read read = DatastoreIO.v1beta3().read()
- .withProjectId(PROJECT_ID)
- .withQuery(QUERY)
- .withNamespace(NAMESPACE);
-
- DisplayData displayData = DisplayData.from(read);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
- assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testSourcePrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PBegin, ? extends POutput> read = DatastoreIO.v1beta3().read().withProjectId(
- "myProject").withQuery(Query.newBuilder().build());
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat("DatastoreIO read should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- }
-
- @Test
- public void testWriteDoesNotAllowNullProject() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- DatastoreIO.v1beta3().write().withProjectId(null);
- }
-
- @Test
- public void testWriteValidationFailsWithNoProject() throws Exception {
- Write write = DatastoreIO.v1beta3().write();
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- write.validate(null);
- }
-
- @Test
- public void testWriteValidationSucceedsWithProject() throws Exception {
- Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
- write.validate(null);
- }
-
- @Test
- public void testWriteDisplayData() {
- Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
-
- DisplayData displayData = DisplayData.from(write);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- @Test
- public void testDeleteEntityDoesNotAllowNullProject() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- DatastoreIO.v1beta3().deleteEntity().withProjectId(null);
- }
-
- @Test
- public void testDeleteEntityValidationFailsWithNoProject() throws Exception {
- DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity();
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- deleteEntity.validate(null);
- }
-
- @Test
- public void testDeleteEntityValidationSucceedsWithProject() throws Exception {
- DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
- deleteEntity.validate(null);
- }
-
- @Test
- public void testDeleteEntityDisplayData() {
- DeleteEntity deleteEntity = DatastoreIO.v1beta3().deleteEntity().withProjectId(PROJECT_ID);
-
- DisplayData displayData = DisplayData.from(deleteEntity);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- @Test
- public void testDeleteKeyDoesNotAllowNullProject() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- DatastoreIO.v1beta3().deleteKey().withProjectId(null);
- }
-
- @Test
- public void testDeleteKeyValidationFailsWithNoProject() throws Exception {
- DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey();
-
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("projectId");
-
- deleteKey.validate(null);
- }
-
- @Test
- public void testDeleteKeyValidationSucceedsWithProject() throws Exception {
- DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
- deleteKey.validate(null);
- }
-
- @Test
- public void testDeleteKeyDisplayData() {
- DeleteKey deleteKey = DatastoreIO.v1beta3().deleteKey().withProjectId(PROJECT_ID);
-
- DisplayData displayData = DisplayData.from(deleteKey);
-
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testWritePrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Entity>, ?> write =
- DatastoreIO.v1beta3().write().withProjectId("myProject");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("DatastoreIO write should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- assertThat("DatastoreIO write should include the upsertFn in its primitive display data",
- displayData, hasItem(hasDisplayItem("upsertFn")));
-
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testDeleteEntityPrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Entity>, ?> write =
- DatastoreIO.v1beta3().deleteEntity().withProjectId("myProject");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("DatastoreIO write should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- assertThat("DatastoreIO write should include the deleteEntityFn in its primitive display data",
- displayData, hasItem(hasDisplayItem("deleteEntityFn")));
-
- }
-
- @Test
- @Category(RunnableOnService.class)
- public void testDeleteKeyPrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Key>, ?> write =
- DatastoreIO.v1beta3().deleteKey().withProjectId("myProject");
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("DatastoreIO write should include the project in its primitive display data",
- displayData, hasItem(hasDisplayItem("projectId")));
- assertThat("DatastoreIO write should include the deleteKeyFn in its primitive display data",
- displayData, hasItem(hasDisplayItem("deleteKeyFn")));
-
- }
-
- /**
- * Test building a Write using builder methods.
- */
- @Test
- public void testBuildWrite() throws Exception {
- V1Beta3.Write write = DatastoreIO.v1beta3().write().withProjectId(PROJECT_ID);
- assertEquals(PROJECT_ID, write.getProjectId());
- }
-
- /**
- * Test the detection of complete and incomplete keys.
- */
- @Test
- public void testHasNameOrId() {
- Key key;
- // Complete with name, no ancestor
- key = makeKey("bird", "finch").build();
- assertTrue(isValidKey(key));
-
- // Complete with id, no ancestor
- key = makeKey("bird", 123).build();
- assertTrue(isValidKey(key));
-
- // Incomplete, no ancestor
- key = makeKey("bird").build();
- assertFalse(isValidKey(key));
-
- // Complete with name and ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird", "horned").build();
- assertTrue(isValidKey(key));
-
- // Complete with id and ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird", 123).build();
- assertTrue(isValidKey(key));
-
- // Incomplete with ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird").build();
- assertFalse(isValidKey(key));
-
- key = makeKey().build();
- assertFalse(isValidKey(key));
- }
-
- /**
- * Test that entities with incomplete keys cannot be updated.
- */
- @Test
- public void testAddEntitiesWithIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- UpsertFn upsertFn = new UpsertFn();
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Entities to be written to the Datastore must have complete keys");
-
- upsertFn.apply(entity);
- }
-
- @Test
- /**
- * Test that entities with valid keys are transformed to upsert mutations.
- */
- public void testAddEntities() throws Exception {
- Key key = makeKey("bird", "finch").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- UpsertFn upsertFn = new UpsertFn();
-
- Mutation exceptedMutation = makeUpsert(entity).build();
- assertEquals(upsertFn.apply(entity), exceptedMutation);
- }
-
- /**
- * Test that entities with incomplete keys cannot be deleted.
- */
- @Test
- public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Entities to be deleted from the Datastore must have complete keys");
-
- deleteEntityFn.apply(entity);
- }
-
- /**
- * Test that entities with valid keys are transformed to delete mutations.
- */
- @Test
- public void testDeleteEntities() throws Exception {
- Key key = makeKey("bird", "finch").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
-
- Mutation exceptedMutation = makeDelete(entity.getKey()).build();
- assertEquals(deleteEntityFn.apply(entity), exceptedMutation);
- }
-
- /**
- * Test that incomplete keys cannot be deleted.
- */
- @Test
- public void testDeleteIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Keys to be deleted from the Datastore must be complete");
-
- deleteKeyFn.apply(key);
- }
-
- /**
- * Test that valid keys are transformed to delete mutations.
- */
- @Test
- public void testDeleteKeys() throws Exception {
- Key key = makeKey("bird", "finch").build();
- DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
-
- Mutation exceptedMutation = makeDelete(key).build();
- assertEquals(deleteKeyFn.apply(key), exceptedMutation);
- }
-
- @Test
- public void testDatastoreWriteFnDisplayData() {
- DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID);
- DisplayData displayData = DisplayData.from(datastoreWriter);
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
-
- /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
- @Test
- public void testDatatoreWriterFnWithOneBatch() throws Exception {
- datastoreWriterFnTest(100);
- }
-
- /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
- @Test
- public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
- datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 3 + 100);
- }
-
- /**
- * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
- * write batch size.
- */
- @Test
- public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
- datastoreWriterFnTest(DATASTORE_BATCH_UPDATE_LIMIT * 2);
- }
-
- // A helper method to test DatastoreWriterFn for various batch sizes.
- private void datastoreWriterFnTest(int numMutations) throws Exception {
- // Create the requested number of mutations.
- List<Mutation> mutations = new ArrayList<>(numMutations);
- for (int i = 0; i < numMutations; ++i) {
- mutations.add(
- makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
- }
-
- DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, mockDatastoreFactory);
- DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- doFnTester.processBundle(mutations);
-
- int start = 0;
- while (start < numMutations) {
- int end = Math.min(numMutations, start + DATASTORE_BATCH_UPDATE_LIMIT);
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- commitRequest.addAllMutations(mutations.subList(start, end));
- // Verify all the batch requests were made with the expected mutations.
- verify(mockDatastore, times(1)).commit(commitRequest.build());
- start = end;
- }
- }
-
- /**
- * Tests {@link V1Beta3.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
- * query.
- */
- @Test
- public void testEstimatedSizeBytes() throws Exception {
- long entityBytes = 100L;
- // Per Kind statistics request and response
- RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
- RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
- when(mockDatastore.runQuery(statRequest))
- .thenReturn(statResponse);
-
- assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
- verify(mockDatastore, times(1)).runQuery(statRequest);
- }
-
- /**
- * Tests {@link SplitQueryFn} when number of query splits is specified.
- */
- @Test
- public void testSplitQueryFnWithNumSplits() throws Exception {
- int numSplits = 100;
- when(mockQuerySplitter.getSplits(
- eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
- .thenReturn(splitQuery(QUERY, numSplits));
-
- SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory);
- DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
- /**
- * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
- * mock factory using a when clause for unit testing purposes, it is not serializable
- * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
- * doFn from being serialized.
- */
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
-
- assertEquals(queries.size(), numSplits);
- verifyUniqueKeys(queries);
- verify(mockQuerySplitter, times(1)).getSplits(
- eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
- verifyZeroInteractions(mockDatastore);
- }
-
- /**
- * Tests {@link SplitQueryFn} when no query splits is specified.
- */
- @Test
- public void testSplitQueryFnWithoutNumSplits() throws Exception {
- // Force SplitQueryFn to compute the number of query splits
- int numSplits = 0;
- int expectedNumSplits = 20;
- long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
-
- // Per Kind statistics request and response
- RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE), NAMESPACE);
- RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
- when(mockDatastore.runQuery(statRequest))
- .thenReturn(statResponse);
- when(mockQuerySplitter.getSplits(
- eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
- .thenReturn(splitQuery(QUERY, expectedNumSplits));
-
- SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, numSplits, mockDatastoreFactory);
- DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<KV<Integer, Query>> queries = doFnTester.processBundle(QUERY);
-
- assertEquals(queries.size(), expectedNumSplits);
- verifyUniqueKeys(queries);
- verify(mockQuerySplitter, times(1)).getSplits(
- eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
- verify(mockDatastore, times(1)).runQuery(statRequest);
- }
-
- /**
- * Tests {@link V1Beta3.Read.SplitQueryFn} when the query has a user specified limit.
- */
- @Test
- public void testSplitQueryFnWithQueryLimit() throws Exception {
- Query queryWithLimit = QUERY.toBuilder().clone()
- .setLimit(Int32Value.newBuilder().setValue(1))
- .build();
-
- SplitQueryFn splitQueryFn = new SplitQueryFn(v1Beta3Options, 10, mockDatastoreFactory);
- DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<KV<Integer, Query>> queries = doFnTester.processBundle(queryWithLimit);
-
- assertEquals(queries.size(), 1);
- verifyUniqueKeys(queries);
- verifyNoMoreInteractions(mockDatastore);
- verifyNoMoreInteractions(mockQuerySplitter);
- }
-
- /** Tests {@link ReadFn} with a query limit less than one batch. */
- @Test
- public void testReadFnWithOneBatch() throws Exception {
- readFnTest(5);
- }
-
- /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
- @Test
- public void testReadFnWithMultipleBatches() throws Exception {
- readFnTest(QUERY_BATCH_LIMIT + 5);
- }
-
- /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
- @Test
- public void testReadFnWithBatchesExactMultiple() throws Exception {
- readFnTest(5 * QUERY_BATCH_LIMIT);
- }
-
- /** Helper Methods */
-
- /** A helper function that verifies if all the queries have unique keys. */
- private void verifyUniqueKeys(List<KV<Integer, Query>> queries) {
- Set<Integer> keys = new HashSet<>();
- for (KV<Integer, Query> kv: queries) {
- keys.add(kv.getKey());
- }
- assertEquals(keys.size(), queries.size());
- }
-
- /**
- * A helper function that creates mock {@link Entity} results in response to a query. Always
- * indicates that more results are available, unless the batch is limited to fewer than
- * {@link V1Beta3.Read#QUERY_BATCH_LIMIT} results.
- */
- private static RunQueryResponse mockResponseForQuery(Query q) {
- // Every query V1Beta3 sends should have a limit.
- assertTrue(q.hasLimit());
-
- // The limit should be in the range [1, QUERY_BATCH_LIMIT]
- int limit = q.getLimit().getValue();
- assertThat(limit, greaterThanOrEqualTo(1));
- assertThat(limit, lessThanOrEqualTo(QUERY_BATCH_LIMIT));
-
- // Create the requested number of entities.
- List<EntityResult> entities = new ArrayList<>(limit);
- for (int i = 0; i < limit; ++i) {
- entities.add(
- EntityResult.newBuilder()
- .setEntity(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)))
- .build());
- }
-
- // Fill out the other parameters on the returned result batch.
- RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
- ret.getBatchBuilder()
- .addAllEntityResults(entities)
- .setEntityResultType(EntityResult.ResultType.FULL)
- .setMoreResults(
- limit == QUERY_BATCH_LIMIT
- ? QueryResultBatch.MoreResultsType.NOT_FINISHED
- : QueryResultBatch.MoreResultsType.NO_MORE_RESULTS);
-
- return ret.build();
- }
-
- /** Helper function to run a test reading from a {@link ReadFn}. */
- private void readFnTest(int numEntities) throws Exception {
- // An empty query to read entities.
- Query query = Query.newBuilder().setLimit(
- Int32Value.newBuilder().setValue(numEntities)).build();
-
- // Use mockResponseForQuery to generate results.
- when(mockDatastore.runQuery(any(RunQueryRequest.class)))
- .thenAnswer(new Answer<RunQueryResponse>() {
- @Override
- public RunQueryResponse answer(InvocationOnMock invocationOnMock) throws Throwable {
- Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
- return mockResponseForQuery(q);
- }
- });
-
- ReadFn readFn = new ReadFn(v1Beta3Options, mockDatastoreFactory);
- DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
- /**
- * Although Datastore client is marked transient in {@link ReadFn}, when injected through
- * mock factory using a when clause for unit testing purposes, it is not serializable
- * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
- * test object from being serialized.
- */
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<Entity> entities = doFnTester.processBundle(query);
-
- int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
- verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
- // Validate the number of results.
- assertEquals(numEntities, entities.size());
- }
-
- /** Builds a per-kind statistics response with the given entity size. */
- private static RunQueryResponse makeStatKindResponse(long entitySizeInBytes) {
- RunQueryResponse.Builder timestampResponse = RunQueryResponse.newBuilder();
- Entity.Builder entity = Entity.newBuilder();
- entity.setKey(makeKey("dummyKind", "dummyId"));
- entity.getMutableProperties().put("entity_bytes", makeValue(entitySizeInBytes).build());
- EntityResult.Builder entityResult = EntityResult.newBuilder();
- entityResult.setEntity(entity);
- QueryResultBatch.Builder batch = QueryResultBatch.newBuilder();
- batch.addEntityResults(entityResult);
- timestampResponse.setBatch(batch);
- return timestampResponse.build();
- }
-
- /** Builds a per-kind statistics query for the given timestamp and namespace. */
- private static Query makeStatKindQuery(String namespace) {
- Query.Builder statQuery = Query.newBuilder();
- if (namespace == null) {
- statQuery.addKindBuilder().setName("__Stat_Kind__");
- } else {
- statQuery.addKindBuilder().setName("__Ns_Stat_Kind__");
- }
- statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
- statQuery.addOrder(makeOrder("timestamp", DESCENDING));
- statQuery.setLimit(Int32Value.newBuilder().setValue(1));
- return statQuery.build();
- }
-
- /** Generate dummy query splits. */
- private List<Query> splitQuery(Query query, int numSplits) {
- List<Query> queries = new LinkedList<>();
- for (int i = 0; i < numSplits; i++) {
- queries.add(query.toBuilder().clone().build());
- }
- return queries;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
deleted file mode 100644
index 099ebe0..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestOptions.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import org.apache.beam.sdk.options.Default;
-import org.apache.beam.sdk.options.Description;
-import org.apache.beam.sdk.testing.TestPipelineOptions;
-
-import javax.annotation.Nullable;
-
-/**
- * V1Beta3 Datastore related pipeline options.
- */
-public interface V1Beta3TestOptions extends TestPipelineOptions {
- @Description("Project ID to read from datastore")
- @Default.String("apache-beam-testing")
- String getProject();
- void setProject(String value);
-
- @Description("Datastore Entity kind")
- @Default.String("beam-test")
- String getKind();
- void setKind(String value);
-
- @Description("Datastore Namespace")
- String getNamespace();
- void setNamespace(@Nullable String value);
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
deleted file mode 100644
index 7eaf23e..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3TestUtil.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeDelete;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
-import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
-import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
-
-import com.google.api.client.auth.oauth2.Credential;
-import com.google.api.client.util.BackOff;
-import com.google.api.client.util.BackOffUtils;
-import com.google.api.client.util.Sleeper;
-import com.google.datastore.v1beta3.CommitRequest;
-import com.google.datastore.v1beta3.Entity;
-import com.google.datastore.v1beta3.EntityResult;
-import com.google.datastore.v1beta3.Key;
-import com.google.datastore.v1beta3.Key.PathElement;
-import com.google.datastore.v1beta3.Mutation;
-import com.google.datastore.v1beta3.PropertyFilter;
-import com.google.datastore.v1beta3.Query;
-import com.google.datastore.v1beta3.QueryResultBatch;
-import com.google.datastore.v1beta3.RunQueryRequest;
-import com.google.datastore.v1beta3.RunQueryResponse;
-import com.google.datastore.v1beta3.client.Datastore;
-import com.google.datastore.v1beta3.client.DatastoreException;
-import com.google.datastore.v1beta3.client.DatastoreFactory;
-import com.google.datastore.v1beta3.client.DatastoreOptions;
-import com.google.protobuf.Int32Value;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.UUID;
-import javax.annotation.Nullable;
-
-class V1Beta3TestUtil {
- private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestUtil.class);
-
- /**
- * A helper function to create the ancestor key for all created and queried entities.
- */
- static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) {
- Key.Builder keyBuilder = makeKey(kind, ancestor);
- if (namespace != null) {
- keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
- }
- return keyBuilder.build();
- }
-
- /**
- * Build a datastore ancestor query for the specified kind, namespace and ancestor.
- */
- static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) {
- Query.Builder q = Query.newBuilder();
- q.addKindBuilder().setName(kind);
- q.setFilter(makeFilter(
- "__key__",
- PropertyFilter.Operator.HAS_ANCESTOR,
- makeValue(makeAncestorKey(namespace, kind, ancestor))));
- return q.build();
- }
-
- /**
- * Build an entity for the given ancestorKey, kind, namespace and value.
- */
- static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
- Entity.Builder entityBuilder = Entity.newBuilder();
- Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
- // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
- // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
- // we can simplify this code.
- if (namespace != null) {
- keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
- }
-
- entityBuilder.setKey(keyBuilder.build());
- entityBuilder.getMutableProperties().put("value", makeValue(value).build());
- return entityBuilder.build();
- }
-
- /**
- * A DoFn that creates entity for a long number.
- */
- static class CreateEntityFn extends DoFn<Long, Entity> {
- private final String kind;
- @Nullable
- private final String namespace;
- private Key ancestorKey;
-
- CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
- this.kind = kind;
- this.namespace = namespace;
- // Build the ancestor key for all created entities once, including the namespace.
- ancestorKey = makeAncestorKey(namespace, kind, ancestor);
- }
-
- @ProcessElement
- public void processElement(ProcessContext c) throws Exception {
- c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
- }
- }
-
- /**
- * Build a new datastore client.
- */
- static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
- DatastoreOptions.Builder builder =
- new DatastoreOptions.Builder()
- .projectId(projectId)
- .initializer(
- new RetryHttpRequestInitializer()
- );
-
- Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
- if (credential != null) {
- builder.credential(credential);
- }
- return DatastoreFactory.get().create(builder.build());
- }
-
- /**
- * Build a datastore query request.
- */
- private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
- RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
- if (namespace != null) {
- requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
- }
- return requestBuilder.build();
- }
-
- /**
- * Delete all entities with the given ancestor.
- */
- static void deleteAllEntities(V1Beta3TestOptions options, String ancestor) throws Exception {
- Datastore datastore = getDatastore(options, options.getProject());
- Query query = V1Beta3TestUtil.makeAncestorKindQuery(
- options.getKind(), options.getNamespace(), ancestor);
-
- V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace());
- V1Beta3TestWriter writer = new V1Beta3TestWriter(datastore, new DeleteMutationBuilder());
-
- long numEntities = 0;
- while (reader.advance()) {
- Entity entity = reader.getCurrent();
- numEntities++;
- writer.write(entity);
- }
-
- writer.close();
- LOG.info("Successfully deleted {} entities", numEntities);
- }
-
- /**
- * Returns the total number of entities for the given datastore.
- */
- static long countEntities(V1Beta3TestOptions options, String ancestor) throws Exception {
- // Read from datastore.
- Datastore datastore = V1Beta3TestUtil.getDatastore(options, options.getProject());
- Query query = V1Beta3TestUtil.makeAncestorKindQuery(
- options.getKind(), options.getNamespace(), ancestor);
-
- V1Beta3TestReader reader = new V1Beta3TestReader(datastore, query, options.getNamespace());
-
- long numEntitiesRead = 0;
- while (reader.advance()) {
- reader.getCurrent();
- numEntitiesRead++;
- }
- return numEntitiesRead;
- }
-
- /**
- * An interface to represent any datastore mutation operation.
- * Mutation operations include insert, delete, upsert, update.
- */
- interface MutationBuilder {
- Mutation.Builder apply(Entity entity);
- }
-
- /**
- *A MutationBuilder that performs upsert operation.
- */
- static class UpsertMutationBuilder implements MutationBuilder {
- public Mutation.Builder apply(Entity entity) {
- return makeUpsert(entity);
- }
- }
-
- /**
- * A MutationBuilder that performs delete operation.
- */
- static class DeleteMutationBuilder implements MutationBuilder {
- public Mutation.Builder apply(Entity entity) {
- return makeDelete(entity.getKey());
- }
- }
-
- /**
- * A helper class to write entities to datastore.
- */
- static class V1Beta3TestWriter {
- private static final Logger LOG = LoggerFactory.getLogger(V1Beta3TestWriter.class);
- // Limits the number of entities updated per batch
- private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
- // Number of times to retry on update failure
- private static final int MAX_RETRIES = 5;
- //Initial backoff time for exponential backoff for retry attempts.
- private static final int INITIAL_BACKOFF_MILLIS = 5000;
-
- // Returns true if a Datastore key is complete. A key is complete if its last element
- // has either an id or a name.
- static boolean isValidKey(Key key) {
- List<PathElement> elementList = key.getPathList();
- if (elementList.isEmpty()) {
- return false;
- }
- PathElement lastElement = elementList.get(elementList.size() - 1);
- return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
- }
-
- private final Datastore datastore;
- private final MutationBuilder mutationBuilder;
- private final List<Entity> entities = new ArrayList<>();
-
- V1Beta3TestWriter(Datastore datastore, MutationBuilder mutationBuilder) {
- this.datastore = datastore;
- this.mutationBuilder = mutationBuilder;
- }
-
- void write(Entity value) throws Exception {
- // Verify that the entity to write has a complete key.
- if (!isValidKey(value.getKey())) {
- throw new IllegalArgumentException(
- "Entities to be written to the Datastore must have complete keys");
- }
-
- entities.add(value);
-
- if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
- flushBatch();
- }
- }
-
- void close() throws Exception {
- // flush any remaining entities
- if (entities.size() > 0) {
- flushBatch();
- }
- }
-
- // commit the list of entities to datastore
- private void flushBatch() throws DatastoreException, IOException, InterruptedException {
- LOG.info("Writing batch of {} entities", entities.size());
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
-
- while (true) {
- // Batch mutate entities.
- try {
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- for (Entity entity: entities) {
- commitRequest.addMutations(mutationBuilder.apply(entity));
- }
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- datastore.commit(commitRequest.build());
- // Break if the commit threw no exception.
- break;
- } catch (DatastoreException exception) {
- LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
- exception.getMessage());
- if (!BackOffUtils.next(sleeper, backoff)) {
- LOG.error("Aborting after {} retries.", MAX_RETRIES);
- throw exception;
- }
- }
- }
- LOG.info("Successfully wrote {} entities", entities.size());
- entities.clear();
- }
- }
-
- /**
- * A helper class to read entities from datastore.
- */
- static class V1Beta3TestReader {
- private static final int QUERY_BATCH_LIMIT = 500;
- private final Datastore datastore;
- private final Query query;
- @Nullable
- private final String namespace;
- private boolean moreResults;
- private java.util.Iterator<EntityResult> entities;
- // Current batch of query results
- private QueryResultBatch currentBatch;
- private Entity currentEntity;
-
- V1Beta3TestReader(Datastore datastore, Query query, @Nullable String namespace) {
- this.datastore = datastore;
- this.query = query;
- this.namespace = namespace;
- }
-
- Entity getCurrent() {
- return currentEntity;
- }
-
- boolean advance() throws IOException {
- if (entities == null || (!entities.hasNext() && moreResults)) {
- try {
- entities = getIteratorAndMoveCursor();
- } catch (DatastoreException e) {
- throw new IOException(e);
- }
- }
-
- if (entities == null || !entities.hasNext()) {
- currentEntity = null;
- return false;
- }
-
- currentEntity = entities.next().getEntity();
- return true;
- }
-
- // Read the next batch of query results.
- private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
- Query.Builder query = this.query.toBuilder().clone();
- query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
- if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
- query.setStartCursor(currentBatch.getEndCursor());
- }
-
- RunQueryRequest request = makeRequest(query.build(), namespace);
- RunQueryResponse response = datastore.runQuery(request);
-
- currentBatch = response.getBatch();
-
- int numFetch = currentBatch.getEntityResultsCount();
- // All indications from the API are that there are/may be more results.
- moreResults = ((numFetch == QUERY_BATCH_LIMIT)
- || (currentBatch.getMoreResults() == NOT_FINISHED));
-
- // May receive a batch of 0 results if the number of records is a multiple
- // of the request limit.
- if (numFetch == 0) {
- return null;
- }
-
- return currentBatch.getEntityResultsList().iterator();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
deleted file mode 100644
index 782065f..0000000
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1Beta3WriteIT.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.io.gcp.datastore;
-
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.countEntities;
-import static org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.deleteAllEntities;
-import static org.junit.Assert.assertEquals;
-
-import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.io.CountingInput;
-import org.apache.beam.sdk.io.gcp.datastore.V1Beta3TestUtil.CreateEntityFn;
-import org.apache.beam.sdk.options.PipelineOptionsFactory;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.ParDo;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.UUID;
-
-/**
- * End-to-end tests for Datastore V1Beta3.Write.
- */
-@RunWith(JUnit4.class)
-public class V1Beta3WriteIT {
- private V1Beta3TestOptions options;
- private String ancestor;
- private final long numEntities = 1000;
-
- @Before
- public void setup() {
- PipelineOptionsFactory.register(V1Beta3TestOptions.class);
- options = TestPipeline.testingPipelineOptions().as(V1Beta3TestOptions.class);
- ancestor = UUID.randomUUID().toString();
- }
-
- /**
- * An end-to-end test for {@link V1Beta3.Write}.
- *
- * Write some test entities to datastore through a dataflow pipeline.
- * Read and count all the entities. Verify that the count matches the
- * number of entities written.
- */
- @Test
- public void testE2EV1Beta3Write() throws Exception {
- Pipeline p = Pipeline.create(options);
-
- // Write to datastore
- p.apply(CountingInput.upTo(numEntities))
- .apply(ParDo.of(new CreateEntityFn(
- options.getKind(), options.getNamespace(), ancestor)))
- .apply(DatastoreIO.v1beta3().write().withProjectId(options.getProject()));
-
- p.run();
-
- // Count number of entities written to datastore.
- long numEntitiesWritten = countEntities(options, ancestor);
-
- assertEquals(numEntitiesWritten, numEntities);
- }
-
- @After
- public void tearDown() throws Exception {
- deleteAllEntities(options, ancestor);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
new file mode 100644
index 0000000..8fedc77
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.getDatastore;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeAncestorKey;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.makeEntity;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.UpsertMutationBuilder;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.V1TestWriter;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.values.PCollection;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.client.Datastore;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.UUID;
+
+/**
+ * End-to-end tests for Datastore DatastoreV1.Read.
+ */
+@RunWith(JUnit4.class)
+public class V1ReadIT {
+ private V1TestOptions options;
+ private String ancestor;
+ private final long numEntities = 1000;
+
+ @Before
+ public void setup() {
+ PipelineOptionsFactory.register(V1TestOptions.class);
+ options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+ ancestor = UUID.randomUUID().toString();
+ }
+
+ /**
+ * An end-to-end test for {@link DatastoreV1.Read}.
+ *
+ * Write some test entities to datastore and then run a dataflow pipeline that
+ * reads and counts the total number of entities. Verify that the count matches the
+ * number of entities written.
+ */
+ @Test
+ public void testE2EV1Read() throws Exception {
+ // Create entities and write them to datastore
+ writeEntitiesToDatastore(options, ancestor, numEntities);
+
+ // Read from datastore
+ Query query = V1TestUtil.makeAncestorKindQuery(
+ options.getKind(), options.getNamespace(), ancestor);
+
+ DatastoreV1.Read read = DatastoreIO.v1().read()
+ .withProjectId(options.getProject())
+ .withQuery(query)
+ .withNamespace(options.getNamespace());
+
+ // Count the total number of entities
+ Pipeline p = Pipeline.create(options);
+ PCollection<Long> count = p
+ .apply(read)
+ .apply(Count.<Entity>globally());
+
+ PAssert.thatSingleton(count).isEqualTo(numEntities);
+ p.run();
+ }
+
+ // Creates entities and write them to datastore
+ private static void writeEntitiesToDatastore(V1TestOptions options, String ancestor,
+ long numEntities) throws Exception {
+ Datastore datastore = getDatastore(options, options.getProject());
+ // Write test entities to datastore
+ V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder());
+ Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
+
+ for (long i = 0; i < numEntities; i++) {
+ Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace());
+ writer.write(entity);
+ }
+ writer.close();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ deleteAllEntities(options, ancestor);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
new file mode 100644
index 0000000..360855f
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestOptions.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+import javax.annotation.Nullable;
+
+/**
+ * DatastoreV1 Datastore related pipeline options.
+ */
+public interface V1TestOptions extends TestPipelineOptions {
+ @Description("Project ID to read from datastore")
+ @Default.String("apache-beam-testing")
+ String getProject();
+ void setProject(String value);
+
+ @Description("Datastore Entity kind")
+ @Default.String("beam-test")
+ String getKind();
+ void setKind(String value);
+
+ @Description("Datastore Namespace")
+ String getNamespace();
+ void setNamespace(@Nullable String value);
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
new file mode 100644
index 0000000..1e323ec
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1TestUtil.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
+import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import org.apache.beam.sdk.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.util.AttemptBoundedExponentialBackOff;
+import org.apache.beam.sdk.util.RetryHttpRequestInitializer;
+
+import com.google.api.client.auth.oauth2.Credential;
+import com.google.api.client.util.BackOff;
+import com.google.api.client.util.BackOffUtils;
+import com.google.api.client.util.Sleeper;
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.EntityResult;
+import com.google.datastore.v1.Key;
+import com.google.datastore.v1.Key.PathElement;
+import com.google.datastore.v1.Mutation;
+import com.google.datastore.v1.PropertyFilter;
+import com.google.datastore.v1.Query;
+import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.RunQueryRequest;
+import com.google.datastore.v1.RunQueryResponse;
+import com.google.datastore.v1.client.Datastore;
+import com.google.datastore.v1.client.DatastoreException;
+import com.google.datastore.v1.client.DatastoreFactory;
+import com.google.datastore.v1.client.DatastoreOptions;
+import com.google.protobuf.Int32Value;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import javax.annotation.Nullable;
+
+class V1TestUtil {
+ private static final Logger LOG = LoggerFactory.getLogger(V1TestUtil.class);
+
+ /**
+ * A helper function to create the ancestor key for all created and queried entities.
+ */
+ static Key makeAncestorKey(@Nullable String namespace, String kind, String ancestor) {
+ Key.Builder keyBuilder = makeKey(kind, ancestor);
+ if (namespace != null) {
+ keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+ }
+ return keyBuilder.build();
+ }
+
+ /**
+ * Build a datastore ancestor query for the specified kind, namespace and ancestor.
+ */
+ static Query makeAncestorKindQuery(String kind, @Nullable String namespace, String ancestor) {
+ Query.Builder q = Query.newBuilder();
+ q.addKindBuilder().setName(kind);
+ q.setFilter(makeFilter(
+ "__key__",
+ PropertyFilter.Operator.HAS_ANCESTOR,
+ makeValue(makeAncestorKey(namespace, kind, ancestor))));
+ return q.build();
+ }
+
+ /**
+ * Build an entity for the given ancestorKey, kind, namespace and value.
+ */
+ static Entity makeEntity(Long value, Key ancestorKey, String kind, @Nullable String namespace) {
+ Entity.Builder entityBuilder = Entity.newBuilder();
+ Key.Builder keyBuilder = makeKey(ancestorKey, kind, UUID.randomUUID().toString());
+ // NOTE: Namespace is not inherited between keys created with DatastoreHelper.makeKey, so
+ // we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
+ // we can simplify this code.
+ if (namespace != null) {
+ keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+ }
+
+ entityBuilder.setKey(keyBuilder.build());
+ entityBuilder.getMutableProperties().put("value", makeValue(value).build());
+ return entityBuilder.build();
+ }
+
+ /**
+ * A DoFn that creates entity for a long number.
+ */
+ static class CreateEntityFn extends DoFn<Long, Entity> {
+ private final String kind;
+ @Nullable
+ private final String namespace;
+ private Key ancestorKey;
+
+ CreateEntityFn(String kind, @Nullable String namespace, String ancestor) {
+ this.kind = kind;
+ this.namespace = namespace;
+ // Build the ancestor key for all created entities once, including the namespace.
+ ancestorKey = makeAncestorKey(namespace, kind, ancestor);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ c.output(makeEntity(c.element(), ancestorKey, kind, namespace));
+ }
+ }
+
+ /**
+ * Build a new datastore client.
+ */
+ static Datastore getDatastore(PipelineOptions pipelineOptions, String projectId) {
+ DatastoreOptions.Builder builder =
+ new DatastoreOptions.Builder()
+ .projectId(projectId)
+ .initializer(
+ new RetryHttpRequestInitializer()
+ );
+
+ Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
+ if (credential != null) {
+ builder.credential(credential);
+ }
+ return DatastoreFactory.get().create(builder.build());
+ }
+
+ /**
+ * Build a datastore query request.
+ */
+ private static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
+ RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
+ if (namespace != null) {
+ requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
+ }
+ return requestBuilder.build();
+ }
+
+ /**
+ * Delete all entities with the given ancestor.
+ */
+ static void deleteAllEntities(V1TestOptions options, String ancestor) throws Exception {
+ Datastore datastore = getDatastore(options, options.getProject());
+ Query query = V1TestUtil.makeAncestorKindQuery(
+ options.getKind(), options.getNamespace(), ancestor);
+
+ V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace());
+ V1TestWriter writer = new V1TestWriter(datastore, new DeleteMutationBuilder());
+
+ long numEntities = 0;
+ while (reader.advance()) {
+ Entity entity = reader.getCurrent();
+ numEntities++;
+ writer.write(entity);
+ }
+
+ writer.close();
+ LOG.info("Successfully deleted {} entities", numEntities);
+ }
+
+ /**
+ * Returns the total number of entities for the given datastore.
+ */
+ static long countEntities(V1TestOptions options, String ancestor) throws Exception {
+ // Read from datastore.
+ Datastore datastore = V1TestUtil.getDatastore(options, options.getProject());
+ Query query = V1TestUtil.makeAncestorKindQuery(
+ options.getKind(), options.getNamespace(), ancestor);
+
+ V1TestReader reader = new V1TestReader(datastore, query, options.getNamespace());
+
+ long numEntitiesRead = 0;
+ while (reader.advance()) {
+ reader.getCurrent();
+ numEntitiesRead++;
+ }
+ return numEntitiesRead;
+ }
+
+ /**
+ * An interface to represent any datastore mutation operation.
+ * Mutation operations include insert, delete, upsert, update.
+ */
+ interface MutationBuilder {
+ Mutation.Builder apply(Entity entity);
+ }
+
+ /**
+ *A MutationBuilder that performs upsert operation.
+ */
+ static class UpsertMutationBuilder implements MutationBuilder {
+ public Mutation.Builder apply(Entity entity) {
+ return makeUpsert(entity);
+ }
+ }
+
+ /**
+ * A MutationBuilder that performs delete operation.
+ */
+ static class DeleteMutationBuilder implements MutationBuilder {
+ public Mutation.Builder apply(Entity entity) {
+ return makeDelete(entity.getKey());
+ }
+ }
+
+ /**
+ * A helper class to write entities to datastore.
+ */
+ static class V1TestWriter {
+ private static final Logger LOG = LoggerFactory.getLogger(V1TestWriter.class);
+ // Limits the number of entities updated per batch
+ private static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
+ // Number of times to retry on update failure
+ private static final int MAX_RETRIES = 5;
+ //Initial backoff time for exponential backoff for retry attempts.
+ private static final int INITIAL_BACKOFF_MILLIS = 5000;
+
+ // Returns true if a Datastore key is complete. A key is complete if its last element
+ // has either an id or a name.
+ static boolean isValidKey(Key key) {
+ List<PathElement> elementList = key.getPathList();
+ if (elementList.isEmpty()) {
+ return false;
+ }
+ PathElement lastElement = elementList.get(elementList.size() - 1);
+ return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
+ }
+
+ private final Datastore datastore;
+ private final MutationBuilder mutationBuilder;
+ private final List<Entity> entities = new ArrayList<>();
+
+ V1TestWriter(Datastore datastore, MutationBuilder mutationBuilder) {
+ this.datastore = datastore;
+ this.mutationBuilder = mutationBuilder;
+ }
+
+ void write(Entity value) throws Exception {
+ // Verify that the entity to write has a complete key.
+ if (!isValidKey(value.getKey())) {
+ throw new IllegalArgumentException(
+ "Entities to be written to the Datastore must have complete keys");
+ }
+
+ entities.add(value);
+
+ if (entities.size() >= DATASTORE_BATCH_UPDATE_LIMIT) {
+ flushBatch();
+ }
+ }
+
+ void close() throws Exception {
+ // flush any remaining entities
+ if (entities.size() > 0) {
+ flushBatch();
+ }
+ }
+
+ // commit the list of entities to datastore
+ private void flushBatch() throws DatastoreException, IOException, InterruptedException {
+ LOG.info("Writing batch of {} entities", entities.size());
+ Sleeper sleeper = Sleeper.DEFAULT;
+ BackOff backoff = new AttemptBoundedExponentialBackOff(MAX_RETRIES, INITIAL_BACKOFF_MILLIS);
+
+ while (true) {
+ // Batch mutate entities.
+ try {
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ for (Entity entity: entities) {
+ commitRequest.addMutations(mutationBuilder.apply(entity));
+ }
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ datastore.commit(commitRequest.build());
+ // Break if the commit threw no exception.
+ break;
+ } catch (DatastoreException exception) {
+ LOG.error("Error writing to the Datastore ({}): {}", exception.getCode(),
+ exception.getMessage());
+ if (!BackOffUtils.next(sleeper, backoff)) {
+ LOG.error("Aborting after {} retries.", MAX_RETRIES);
+ throw exception;
+ }
+ }
+ }
+ LOG.info("Successfully wrote {} entities", entities.size());
+ entities.clear();
+ }
+ }
+
+ /**
+ * A helper class to read entities from datastore.
+ */
+ static class V1TestReader {
+ private static final int QUERY_BATCH_LIMIT = 500;
+ private final Datastore datastore;
+ private final Query query;
+ @Nullable
+ private final String namespace;
+ private boolean moreResults;
+ private java.util.Iterator<EntityResult> entities;
+ // Current batch of query results
+ private QueryResultBatch currentBatch;
+ private Entity currentEntity;
+
+ V1TestReader(Datastore datastore, Query query, @Nullable String namespace) {
+ this.datastore = datastore;
+ this.query = query;
+ this.namespace = namespace;
+ }
+
+ Entity getCurrent() {
+ return currentEntity;
+ }
+
+ boolean advance() throws IOException {
+ if (entities == null || (!entities.hasNext() && moreResults)) {
+ try {
+ entities = getIteratorAndMoveCursor();
+ } catch (DatastoreException e) {
+ throw new IOException(e);
+ }
+ }
+
+ if (entities == null || !entities.hasNext()) {
+ currentEntity = null;
+ return false;
+ }
+
+ currentEntity = entities.next().getEntity();
+ return true;
+ }
+
+ // Read the next batch of query results.
+ private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
+ Query.Builder query = this.query.toBuilder().clone();
+ query.setLimit(Int32Value.newBuilder().setValue(QUERY_BATCH_LIMIT));
+ if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
+ query.setStartCursor(currentBatch.getEndCursor());
+ }
+
+ RunQueryRequest request = makeRequest(query.build(), namespace);
+ RunQueryResponse response = datastore.runQuery(request);
+
+ currentBatch = response.getBatch();
+
+ int numFetch = currentBatch.getEntityResultsCount();
+ // All indications from the API are that there are/may be more results.
+ moreResults = ((numFetch == QUERY_BATCH_LIMIT)
+ || (currentBatch.getMoreResults() == NOT_FINISHED));
+
+ // May receive a batch of 0 results if the number of records is a multiple
+ // of the request limit.
+ if (numFetch == 0) {
+ return null;
+ }
+
+ return currentBatch.getEntityResultsList().iterator();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/54e4cb12/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
new file mode 100644
index 0000000..b97c05c
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1WriteIT.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.countEntities;
+import static org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.deleteAllEntities;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.gcp.datastore.V1TestUtil.CreateEntityFn;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.ParDo;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.UUID;
+
+/**
+ * End-to-end tests for Datastore DatastoreV1.Write.
+ */
+@RunWith(JUnit4.class)
+public class V1WriteIT {
+ private V1TestOptions options;
+ private String ancestor;
+ private final long numEntities = 1000;
+
+ @Before
+ public void setup() {
+ PipelineOptionsFactory.register(V1TestOptions.class);
+ options = TestPipeline.testingPipelineOptions().as(V1TestOptions.class);
+ ancestor = UUID.randomUUID().toString();
+ }
+
+ /**
+ * An end-to-end test for {@link DatastoreV1.Write}.
+ *
+ * Write some test entities to datastore through a dataflow pipeline.
+ * Read and count all the entities. Verify that the count matches the
+ * number of entities written.
+ */
+ @Test
+ public void testE2EV1Write() throws Exception {
+ Pipeline p = Pipeline.create(options);
+
+ // Write to datastore
+ p.apply(CountingInput.upTo(numEntities))
+ .apply(ParDo.of(new CreateEntityFn(
+ options.getKind(), options.getNamespace(), ancestor)))
+ .apply(DatastoreIO.v1().write().withProjectId(options.getProject()));
+
+ p.run();
+
+ // Count number of entities written to datastore.
+ long numEntitiesWritten = countEntities(options, ancestor);
+
+ assertEquals(numEntitiesWritten, numEntities);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ deleteAllEntities(options, ancestor);
+ }
+}