You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/06/22 16:55:08 UTC
[2/2] incubator-beam git commit: DatastoreIO: Update datastore API to
v1beta3
DatastoreIO: Update datastore API to v1beta3
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/104f4dd2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/104f4dd2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/104f4dd2
Branch: refs/heads/master
Commit: 104f4dd27dae9643f953d964b31a0d7674cbf1e9
Parents: f480944
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Thu Jun 16 13:57:43 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Wed Jun 22 09:54:51 2016 -0700
----------------------------------------------------------------------
examples/java/pom.xml | 15 +-
.../beam/examples/complete/AutoComplete.java | 35 +--
.../examples/cookbook/DatastoreWordCount.java | 48 ++--
pom.xml | 44 +--
runners/google-cloud-dataflow-java/pom.xml | 11 +-
.../dataflow/io/DataflowDatastoreIOTest.java | 15 +-
sdks/java/core/pom.xml | 9 +-
.../org/apache/beam/sdk/coders/EntityCoder.java | 87 ------
.../org/apache/beam/sdk/io/DatastoreIO.java | 286 ++++++++-----------
.../apache/beam/sdk/coders/EntityCoderTest.java | 110 -------
.../sdk/coders/protobuf/ProtobufUtilTest.java | 7 +-
.../org/apache/beam/sdk/io/DatastoreIOTest.java | 171 +++++------
.../apache/beam/sdk/util/ApiSurfaceTest.java | 4 +-
13 files changed, 279 insertions(+), 563 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index 5167810..cac9857 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -260,11 +260,6 @@
<dependency>
<groupId>com.google.apis</groupId>
- <artifactId>google-api-services-datastore-protobuf</artifactId>
- </dependency>
-
- <dependency>
- <groupId>com.google.apis</groupId>
<artifactId>google-api-services-pubsub</artifactId>
</dependency>
@@ -279,6 +274,16 @@
</dependency>
<dependency>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-proto-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-protos</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
</dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index ef47762..c6893f4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.examples.complete;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
+
import org.apache.beam.examples.common.DataflowExampleUtils;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExamplePubsubTopicOptions;
@@ -55,18 +57,20 @@ import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Value;
-import com.google.api.services.datastore.client.DatastoreHelper;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
import org.joda.time.Duration;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -396,16 +400,15 @@ public class AutoComplete {
entityBuilder.setKey(key);
List<Value> candidates = new ArrayList<>();
+ Map<String, Value> properties = new HashMap<>();
for (CompletionCandidate tag : c.element().getValue()) {
Entity.Builder tagEntity = Entity.newBuilder();
- tagEntity.addProperty(
- DatastoreHelper.makeProperty("tag", DatastoreHelper.makeValue(tag.value)));
- tagEntity.addProperty(
- DatastoreHelper.makeProperty("count", DatastoreHelper.makeValue(tag.count)));
- candidates.add(DatastoreHelper.makeValue(tagEntity).setIndexed(false).build());
+ properties.put("tag", makeValue(tag.value).build());
+ properties.put("count", makeValue(tag.count).build());
+ candidates.add(makeValue(tagEntity).build());
}
- entityBuilder.addProperty(
- DatastoreHelper.makeProperty("candidates", DatastoreHelper.makeValue(candidates)));
+ properties.put("candidates", makeValue(candidates).build());
+ entityBuilder.putAllProperties(properties);
c.output(entityBuilder.build());
}
}
@@ -426,7 +429,7 @@ public class AutoComplete {
Boolean getRecursive();
void setRecursive(Boolean value);
- @Description("Dataset entity kind")
+ @Description("Datastore entity kind")
@Default.String("autocomplete-demo")
String getKind();
void setKind(String value);
@@ -441,9 +444,9 @@ public class AutoComplete {
Boolean getOutputToDatastore();
void setOutputToDatastore(Boolean value);
- @Description("Datastore output dataset ID, defaults to project ID")
- String getOutputDataset();
- void setOutputDataset(String value);
+ @Description("Datastore output project ID, defaults to project ID")
+ String getOutputProject();
+ void setOutputProject(String value);
}
public static void main(String[] args) throws IOException {
@@ -480,7 +483,7 @@ public class AutoComplete {
toWrite
.apply(ParDo.named("FormatForDatastore").of(new FormatForDatastore(options.getKind())))
.apply(DatastoreIO.writeTo(MoreObjects.firstNonNull(
- options.getOutputDataset(), options.getProject())));
+ options.getOutputProject(), options.getProject())));
}
if (options.getOutputToBigQuery()) {
dataflowUtils.setupBigQueryTable();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index b0192c9..7578d79 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -17,11 +17,10 @@
*/
package org.apache.beam.examples.cookbook;
-import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
-import static com.google.api.services.datastore.client.DatastoreHelper.getString;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.getString;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
import org.apache.beam.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
@@ -37,12 +36,11 @@ import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Property;
-import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.Value;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.PropertyFilter;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.Value;
import java.util.Map;
import java.util.UUID;
@@ -64,7 +62,6 @@ import javax.annotation.Nullable;
* <p>To run this pipeline locally, the following options must be provided:
* <pre>{@code
* --project=YOUR_PROJECT_ID
- * --dataset=YOUR_DATASET_ID
* --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PATH]
* }</pre>
*
@@ -89,7 +86,7 @@ public class DatastoreWordCount {
static class GetContentFn extends DoFn<Entity, String> {
@Override
public void processElement(ProcessContext c) {
- Map<String, Value> props = getPropertyMap(c.element());
+ Map<String, Value> props = c.element().getProperties();
Value value = props.get("content");
if (value != null) {
c.output(getString(value));
@@ -106,7 +103,7 @@ public class DatastoreWordCount {
static Key makeAncestorKey(@Nullable String namespace, String kind) {
Key.Builder keyBuilder = makeKey(kind, "root");
if (namespace != null) {
- keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+ keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
return keyBuilder.build();
}
@@ -136,12 +133,11 @@ public class DatastoreWordCount {
// we must set the namespace on keyBuilder. TODO: Once partitionId inheritance is added,
// we can simplify this code.
if (namespace != null) {
- keyBuilder.getPartitionIdBuilder().setNamespace(namespace);
+ keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
entityBuilder.setKey(keyBuilder.build());
- entityBuilder.addProperty(Property.newBuilder().setName("content")
- .setValue(Value.newBuilder().setStringValue(content)));
+ entityBuilder.getMutableProperties().put("content", makeValue(content).build());
return entityBuilder.build();
}
@@ -167,21 +163,21 @@ public class DatastoreWordCount {
String getOutput();
void setOutput(String value);
- @Description("Dataset ID to read from datastore")
+ @Description("Project ID to read from datastore")
@Validation.Required
- String getDataset();
- void setDataset(String value);
+ String getProject();
+ void setProject(String value);
- @Description("Dataset entity kind")
+ @Description("Datastore Entity kind")
@Default.String("shakespeare-demo")
String getKind();
void setKind(String value);
- @Description("Dataset namespace")
+ @Description("Datastore Namespace")
String getNamespace();
void setNamespace(@Nullable String value);
- @Description("Read an existing dataset, do not write first")
+ @Description("Read an existing project, do not write first")
boolean isReadOnly();
void setReadOnly(boolean value);
@@ -199,7 +195,7 @@ public class DatastoreWordCount {
Pipeline p = Pipeline.create(options);
p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
.apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
- .apply(DatastoreIO.writeTo(options.getDataset()));
+ .apply(DatastoreIO.writeTo(options.getProject()));
p.run();
}
@@ -231,7 +227,7 @@ public class DatastoreWordCount {
// For Datastore sources, the read namespace can be set on the entire query.
DatastoreIO.Source source = DatastoreIO.source()
- .withDataset(options.getDataset())
+ .withProject(options.getProject())
.withQuery(query)
.withNamespace(options.getNamespace());
@@ -259,7 +255,7 @@ public class DatastoreWordCount {
// First example: write data to Datastore for reading later.
//
// NOTE: this write does not delete any existing Entities in the Datastore, so if run
- // multiple times with the same output dataset, there may be duplicate entries. The
+ // multiple times with the same output project, there may be duplicate entries. The
// Datastore Query tool in the Google Developers Console can be used to inspect or erase all
// entries with a particular namespace and/or kind.
DatastoreWordCount.writeDataToDatastore(options);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 49b77fa..31beef1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -105,7 +105,8 @@
<clouddebugger.version>v2-rev8-1.22.0</clouddebugger.version>
<dataflow.version>v1b3-rev26-1.22.0</dataflow.version>
<dataflow.proto.version>0.5.160222</dataflow.proto.version>
- <datastore.version>v1beta2-rev1-4.0.0</datastore.version>
+ <datastore.client.version>1.0.0-beta.2</datastore.client.version>
+ <datastore.proto.version>1.0.0-beta</datastore.proto.version>
<google-auto-service.version>1.0-rc2</google-auto-service.version>
<google-auto-value.version>1.1</google-auto-value.version>
<google-clients.version>1.22.0</google-clients.version>
@@ -437,38 +438,15 @@
</dependency>
<dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-datastore-protobuf</artifactId>
- <version>${datastore.version}</version>
- <exclusions>
- <!-- Exclude an old version of guava that is being pulled in by a transitive
- dependency of google-api-client -->
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava-jdk5</artifactId>
- </exclusion>
- <!-- Exclude old version of api client dependencies. -->
- <exclusion>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.api-client</groupId>
- <artifactId>google-api-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.oauth-client</groupId>
- <artifactId>google-oauth-client</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client-jackson</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.http-client</groupId>
- <artifactId>google-http-client-protobuf</artifactId>
- </exclusion>
- </exclusions>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-proto-client</artifactId>
+ <version>${datastore.client.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-protos</artifactId>
+ <version>${datastore.proto.version}</version>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/runners/google-cloud-dataflow-java/pom.xml
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/pom.xml b/runners/google-cloud-dataflow-java/pom.xml
index 999e16d..5408462 100644
--- a/runners/google-cloud-dataflow-java/pom.xml
+++ b/runners/google-cloud-dataflow-java/pom.xml
@@ -313,11 +313,6 @@
</dependency>
<dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-datastore-protobuf</artifactId>
- </dependency>
-
- <dependency>
<groupId>com.google.oauth-client</groupId>
<artifactId>google-oauth-client</artifactId>
</dependency>
@@ -454,5 +449,11 @@
<artifactId>google-cloud-dataflow-java-proto-library-all</artifactId>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-protos</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
index 42a0b99..e7c0791 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/io/DataflowDatastoreIOTest.java
@@ -30,7 +30,8 @@ import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
-import com.google.api.services.datastore.DatastoreV1;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.Query;
import org.junit.Test;
@@ -44,20 +45,20 @@ public class DataflowDatastoreIOTest {
public void testSourcePrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
PTransform<PInput, ?> read = DatastoreIO.readFrom(
- "myDataset", DatastoreV1.Query.newBuilder().build());
+ "myProject", Query.newBuilder().build());
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(read);
- assertThat("DatastoreIO read should include the dataset in its primitive display data",
- displayData, hasItem(hasDisplayItem("dataset")));
+ assertThat("DatastoreIO read should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("project")));
}
@Test
public void testSinkPrimitiveDisplayData() {
DisplayDataEvaluator evaluator = DataflowDisplayDataEvaluator.create();
- PTransform<PCollection<DatastoreV1.Entity>, ?> write = DatastoreIO.writeTo("myDataset");
+ PTransform<PCollection<Entity>, ?> write = DatastoreIO.writeTo("myProject");
Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat("DatastoreIO write should include the dataset in its primitive display data",
- displayData, hasItem(hasDisplayItem("dataset")));
+ assertThat("DatastoreIO write should include the project in its primitive display data",
+ displayData, hasItem(hasDisplayItem("project")));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/pom.xml
----------------------------------------------------------------------
diff --git a/sdks/java/core/pom.xml b/sdks/java/core/pom.xml
index c559cff..bbba77b 100644
--- a/sdks/java/core/pom.xml
+++ b/sdks/java/core/pom.xml
@@ -393,8 +393,13 @@
</dependency>
<dependency>
- <groupId>com.google.apis</groupId>
- <artifactId>google-api-services-datastore-protobuf</artifactId>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-proto-client</artifactId>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.cloud.datastore</groupId>
+ <artifactId>datastore-v1beta3-protos</artifactId>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/EntityCoder.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/EntityCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/EntityCoder.java
deleted file mode 100644
index a704772..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/EntityCoder.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import com.google.api.services.datastore.DatastoreV1.Entity;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A {@link Coder} for {@link Entity} objects based on their encoded Protocol Buffer form.
- */
-public class EntityCoder extends AtomicCoder<Entity> {
-
- @JsonCreator
- public static EntityCoder of() {
- return INSTANCE;
- }
-
- /***************************/
-
- private static final EntityCoder INSTANCE = new EntityCoder();
-
- private EntityCoder() {}
-
- @Override
- public void encode(Entity value, OutputStream outStream, Context context)
- throws IOException, CoderException {
- if (value == null) {
- throw new CoderException("cannot encode a null Entity");
- }
-
- // Since Entity implements com.google.protobuf.MessageLite,
- // we could directly use writeTo to write to a OutputStream object
- outStream.write(java.nio.ByteBuffer.allocate(4).putInt(value.getSerializedSize()).array());
- value.writeTo(outStream);
- outStream.flush();
- }
-
- @Override
- public Entity decode(InputStream inStream, Context context)
- throws IOException {
- byte[] entitySize = new byte[4];
- inStream.read(entitySize, 0, 4);
- int size = java.nio.ByteBuffer.wrap(entitySize).getInt();
- byte[] data = new byte[size];
- inStream.read(data, 0, size);
- return Entity.parseFrom(data);
- }
-
- @Override
- protected long getEncodedElementByteSize(Entity value, Context context)
- throws Exception {
- return value.getSerializedSize();
- }
-
- /**
- * {@inheritDoc}
- *
- * @throws NonDeterministicException always.
- * A datastore kind can hold arbitrary {@link Object} instances, which
- * makes the encoding non-deterministic.
- */
- @Override
- public void verifyDeterministic() throws NonDeterministicException {
- throw new NonDeterministicException(this,
- "Datastore encodings can hold arbitrary Object instances");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
index 137c6cd..7fab79d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/DatastoreIO.java
@@ -17,21 +17,22 @@
*/
package org.apache.beam.sdk.io;
-import static com.google.api.services.datastore.DatastoreV1.PropertyFilter.Operator.EQUAL;
-import static com.google.api.services.datastore.DatastoreV1.PropertyOrder.Direction.DESCENDING;
-import static com.google.api.services.datastore.DatastoreV1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
-import static com.google.api.services.datastore.client.DatastoreHelper.getPropertyMap;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeFilter;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeOrder;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Verify.verify;
+import static com.google.datastore.v1beta3.PropertyFilter.Operator.EQUAL;
+import static com.google.datastore.v1beta3.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1beta3.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeAndFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeFilter;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeOrder;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeUpsert;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeValue;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.EntityCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.protobuf.ProtoCoder;
import org.apache.beam.sdk.io.Sink.WriteOperation;
import org.apache.beam.sdk.io.Sink.Writer;
import org.apache.beam.sdk.options.GcpOptions;
@@ -45,26 +46,27 @@ import com.google.api.client.auth.oauth2.Credential;
import com.google.api.client.util.BackOff;
import com.google.api.client.util.BackOffUtils;
import com.google.api.client.util.Sleeper;
-import com.google.api.services.datastore.DatastoreV1.CommitRequest;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.EntityResult;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.Key.PathElement;
-import com.google.api.services.datastore.DatastoreV1.PartitionId;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.QueryResultBatch;
-import com.google.api.services.datastore.DatastoreV1.RunQueryRequest;
-import com.google.api.services.datastore.DatastoreV1.RunQueryResponse;
-import com.google.api.services.datastore.client.Datastore;
-import com.google.api.services.datastore.client.DatastoreException;
-import com.google.api.services.datastore.client.DatastoreFactory;
-import com.google.api.services.datastore.client.DatastoreHelper;
-import com.google.api.services.datastore.client.DatastoreOptions;
-import com.google.api.services.datastore.client.QuerySplitter;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
+import com.google.datastore.v1beta3.CommitRequest;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.EntityResult;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.Key.PathElement;
+import com.google.datastore.v1beta3.PartitionId;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.QueryResultBatch;
+import com.google.datastore.v1beta3.RunQueryRequest;
+import com.google.datastore.v1beta3.RunQueryResponse;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreException;
+import com.google.datastore.v1beta3.client.DatastoreFactory;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.DatastoreOptions;
+import com.google.datastore.v1beta3.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -94,10 +96,9 @@ import javax.annotation.Nullable;
* </pre>
*
* <p>To read a {@link PCollection} from a query to Datastore, use {@link DatastoreIO#source} and
- * its methods {@link DatastoreIO.Source#withDataset} and {@link DatastoreIO.Source#withQuery} to
- * specify the dataset to query and the query to read from. You can optionally provide a namespace
- * to query within using {@link DatastoreIO.Source#withNamespace} or a Datastore host using
- * {@link DatastoreIO.Source#withHost}.
+ * its methods {@link DatastoreIO.Source#withProject} and {@link DatastoreIO.Source#withQuery} to
+ * specify the project to query and the query to read from. You can optionally provide a namespace
+ * to query within using {@link DatastoreIO.Source#withNamespace}.
*
* <p>For example:
*
@@ -105,32 +106,31 @@ import javax.annotation.Nullable;
* // Read a query from Datastore
* PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
* Query query = ...;
- * String dataset = "...";
+ * String projectId = "...";
*
* Pipeline p = Pipeline.create(options);
* PCollection<Entity> entities = p.apply(
* Read.from(DatastoreIO.source()
- * .withDataset(datasetId)
- * .withQuery(query)
- * .withHost(host)));
+ * .withProject(projectId)
+ * .withQuery(query));
* } </pre>
*
* <p>or:
*
* <pre> {@code
- * // Read a query from Datastore using the default namespace and host
+ * // Read a query from Datastore using the default namespace
* PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
* Query query = ...;
- * String dataset = "...";
+ * String projectId = "...";
*
* Pipeline p = Pipeline.create(options);
- * PCollection<Entity> entities = p.apply(DatastoreIO.readFrom(datasetId, query));
+ * PCollection<Entity> entities = p.apply(DatastoreIO.readFrom(projectId, query));
* p.run();
* } </pre>
*
* <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel across
* many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.api.services.datastore.DatastoreV1.Query.Builder#setLimit(int)}, then
+ * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then
* all returned results will be read by a single Dataflow worker in order to ensure correct data.
*
* <p>To write a {@link PCollection} to a Datastore, use {@link DatastoreIO#writeTo},
@@ -138,19 +138,10 @@ import javax.annotation.Nullable;
*
* <pre> {@code
* PCollection<Entity> entities = ...;
- * entities.apply(DatastoreIO.writeTo(dataset));
+ * entities.apply(DatastoreIO.writeTo(projectId));
* p.run();
* } </pre>
*
- * <p>To optionally change the host that is used to write to the Datastore, use {@link
- * DatastoreIO#sink} to build a {@link DatastoreIO.Sink} and write to it using the {@link Write}
- * transform:
- *
- * <pre> {@code
- * PCollection<Entity> entities = ...;
- * entities.apply(Write.to(DatastoreIO.sink().withDataset(dataset).withHost(host)));
- * } </pre>
- *
* <p>{@link Entity Entities} in the {@code PCollection} to be written must have complete
* {@link Key Keys}. Complete {@code Keys} specify the {@code name} and {@code id} of the
* {@code Entity}, where incomplete {@code Keys} do not. A {@code namespace} other than the
@@ -177,8 +168,6 @@ import javax.annotation.Nullable;
*/
@Experimental(Experimental.Kind.SOURCE_SINK)
public class DatastoreIO {
- public static final String DEFAULT_HOST = "https://www.googleapis.com";
-
/**
* Datastore has a limit of 500 mutations per batch operation, so we flush
* changes to Datastore every 500 entities.
@@ -186,9 +175,9 @@ public class DatastoreIO {
public static final int DATASTORE_BATCH_UPDATE_LIMIT = 500;
/**
- * Returns an empty {@link DatastoreIO.Source} builder with the default {@code host}.
- * Configure the {@code dataset}, {@code query}, and {@code namespace} using
- * {@link DatastoreIO.Source#withDataset}, {@link DatastoreIO.Source#withQuery},
+ * Returns an empty {@link DatastoreIO.Source} builder.
+ * Configure the {@code project}, {@code query}, and {@code namespace} using
+ * {@link DatastoreIO.Source#withProject}, {@link DatastoreIO.Source#withQuery},
* and {@link DatastoreIO.Source#withNamespace}.
*
* @deprecated the name and return type do not match. Use {@link #source()}.
@@ -199,48 +188,32 @@ public class DatastoreIO {
}
/**
- * Returns an empty {@link DatastoreIO.Source} builder with the default {@code host}.
- * Configure the {@code dataset}, {@code query}, and {@code namespace} using
- * {@link DatastoreIO.Source#withDataset}, {@link DatastoreIO.Source#withQuery},
+ * Returns an empty {@link DatastoreIO.Source} builder.
+ * Configure the {@code project}, {@code query}, and {@code namespace} using
+ * {@link DatastoreIO.Source#withProject}, {@link DatastoreIO.Source#withQuery},
* and {@link DatastoreIO.Source#withNamespace}.
*
* <p>The resulting {@link Source} object can be passed to {@link Read} to create a
* {@code PTransform} that will read from Datastore.
*/
public static Source source() {
- return new Source(DEFAULT_HOST, null, null, null);
+ return new Source(null, null, null);
}
/**
* Returns a {@code PTransform} that reads Datastore entities from the query
- * against the given dataset.
+ * against the given project.
*/
- public static Read.Bounded<Entity> readFrom(String datasetId, Query query) {
- return Read.from(new Source(DEFAULT_HOST, datasetId, query, null));
- }
-
- /**
- * Returns a {@code PTransform} that reads Datastore entities from the query
- * against the given dataset and host.
- *
- * @deprecated prefer {@link #source()} with {@link Source#withHost}, {@link Source#withDataset},
- * {@link Source#withQuery}s.
- */
- @Deprecated
- public static Read.Bounded<Entity> readFrom(String host, String datasetId, Query query) {
- return Read.from(new Source(host, datasetId, query, null));
+ public static Read.Bounded<Entity> readFrom(String projectId, Query query) {
+ return Read.from(new Source(projectId, query, null));
}
/**
* A {@link Source} that reads the result rows of a Datastore query as {@code Entity} objects.
*/
public static class Source extends BoundedSource<Entity> {
- public String getHost() {
- return host;
- }
-
- public String getDataset() {
- return datasetId;
+ public String getProjectId() {
+ return projectId;
}
public Query getQuery() {
@@ -252,9 +225,9 @@ public class DatastoreIO {
return namespace;
}
- public Source withDataset(String datasetId) {
- checkNotNull(datasetId, "datasetId");
- return new Source(host, datasetId, query, namespace);
+ public Source withProject(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new Source(projectId, query, namespace);
}
/**
@@ -264,28 +237,23 @@ public class DatastoreIO {
*
* <p><b>Note:</b> Normally, a Cloud Dataflow job will read from Cloud Datastore in parallel
* across many workers. However, when the {@link Query} is configured with a limit using
- * {@link com.google.api.services.datastore.DatastoreV1.Query.Builder#setLimit(int)}, then all
+ * {@link com.google.datastore.v1beta3.Query.Builder#setLimit(Int32Value)}, then all
* returned results will be read by a single Dataflow worker in order to ensure correct data.
*/
public Source withQuery(Query query) {
checkNotNull(query, "query");
- checkArgument(!query.hasLimit() || query.getLimit() > 0,
- "Invalid query limit %s: must be positive", query.getLimit());
- return new Source(host, datasetId, query, namespace);
- }
-
- public Source withHost(String host) {
- checkNotNull(host, "host");
- return new Source(host, datasetId, query, namespace);
+ checkArgument(!query.hasLimit() || query.getLimit().getValue() > 0,
+ "Invalid query limit %s: must be positive", query.getLimit().getValue());
+ return new Source(projectId, query, namespace);
}
public Source withNamespace(@Nullable String namespace) {
- return new Source(host, datasetId, query, namespace);
+ return new Source(projectId, query, namespace);
}
@Override
public Coder<Entity> getDefaultOutputCoder() {
- return EntityCoder.of();
+ return ProtoCoder.of(Entity.class);
}
@Override
@@ -327,7 +295,7 @@ public class DatastoreIO {
ImmutableList.Builder<Source> splits = ImmutableList.builder();
for (Query splitQuery : datastoreSplits) {
- splits.add(new Source(host, datasetId, splitQuery, namespace));
+ splits.add(new Source(projectId, splitQuery, namespace));
}
return splits.build();
}
@@ -339,9 +307,8 @@ public class DatastoreIO {
@Override
public void validate() {
- Preconditions.checkNotNull(host, "host");
Preconditions.checkNotNull(query, "query");
- Preconditions.checkNotNull(datasetId, "datasetId");
+ Preconditions.checkNotNull(projectId, "projectId");
}
@Override
@@ -369,7 +336,7 @@ public class DatastoreIO {
} else {
query.addKindBuilder().setName("__Ns_Stat_Kind__");
}
- query.setFilter(makeFilter(
+ query.setFilter(makeAndFilter(
makeFilter("kind_name", EQUAL, makeValue(ourKind)).build(),
makeFilter("timestamp", EQUAL, makeValue(latestTimestamp)).build()));
RunQueryRequest request = makeRequest(query.build());
@@ -379,22 +346,20 @@ public class DatastoreIO {
LOG.info("Query for per-kind statistics took {}ms", System.currentTimeMillis() - now);
QueryResultBatch batch = response.getBatch();
- if (batch.getEntityResultCount() == 0) {
+ if (batch.getEntityResultsCount() == 0) {
throw new NoSuchElementException(
"Datastore statistics for kind " + ourKind + " unavailable");
}
- Entity entity = batch.getEntityResult(0).getEntity();
- return getPropertyMap(entity).get("entity_bytes").getIntegerValue();
+ Entity entity = batch.getEntityResults(0).getEntity();
+ return entity.getProperties().get("entity_bytes").getIntegerValue();
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
- .addIfNotDefault(DisplayData.item("host", host)
- .withLabel("Datastore Service"), DEFAULT_HOST)
- .addIfNotNull(DisplayData.item("dataset", datasetId)
- .withLabel("Input Dataset"))
+ .addIfNotNull(DisplayData.item("project", projectId)
+ .withLabel("Input Project"))
.addIfNotNull(DisplayData.item("namespace", namespace)
.withLabel("App Engine Namespace"));
@@ -407,8 +372,7 @@ public class DatastoreIO {
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("host", host)
- .add("dataset", datasetId)
+ .add("project", projectId)
.add("query", query)
.add("namespace", namespace)
.toString();
@@ -417,10 +381,9 @@ public class DatastoreIO {
///////////////////////////////////////////////////////////////////////////////////////////
private static final Logger LOG = LoggerFactory.getLogger(Source.class);
- private final String host;
/** Not really nullable, but it may be {@code null} for in-progress {@code Source}s. */
@Nullable
- private final String datasetId;
+ private final String projectId;
/** Not really nullable, but it may be {@code null} for in-progress {@code Source}s. */
@Nullable
private final Query query;
@@ -439,10 +402,9 @@ public class DatastoreIO {
* an error will be thrown.
*/
private Source(
- String host, @Nullable String datasetId, @Nullable Query query,
+ @Nullable String projectId, @Nullable Query query,
@Nullable String namespace) {
- this.host = checkNotNull(host, "host");
- this.datasetId = datasetId;
+ this.projectId = projectId;
this.query = query;
this.namespace = namespace;
}
@@ -456,7 +418,7 @@ public class DatastoreIO {
// If namespace is set, include it in the split request so splits are calculated accordingly.
PartitionId.Builder partitionBuilder = PartitionId.newBuilder();
if (namespace != null) {
- partitionBuilder.setNamespace(namespace);
+ partitionBuilder.setNamespaceId(namespace);
}
if (mockSplitter != null) {
@@ -475,7 +437,7 @@ public class DatastoreIO {
private RunQueryRequest makeRequest(Query query) {
RunQueryRequest.Builder requestBuilder = RunQueryRequest.newBuilder().setQuery(query);
if (namespace != null) {
- requestBuilder.getPartitionIdBuilder().setNamespace(namespace);
+ requestBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
}
return requestBuilder.build();
}
@@ -488,26 +450,29 @@ public class DatastoreIO {
Query.Builder query = Query.newBuilder();
query.addKindBuilder().setName("__Stat_Total__");
query.addOrder(makeOrder("timestamp", DESCENDING));
- query.setLimit(1);
+ query.setLimit(Int32Value.newBuilder().setValue(1));
RunQueryRequest request = makeRequest(query.build());
long now = System.currentTimeMillis();
RunQueryResponse response = datastore.runQuery(request);
- LOG.info("Query for latest stats timestamp of dataset {} took {}ms", datasetId,
+ LOG.info("Query for latest stats timestamp of project {} took {}ms", projectId,
System.currentTimeMillis() - now);
QueryResultBatch batch = response.getBatch();
- if (batch.getEntityResultCount() == 0) {
+ if (batch.getEntityResultsCount() == 0) {
throw new NoSuchElementException(
- "Datastore total statistics for dataset " + datasetId + " unavailable");
+ "Datastore total statistics for project " + projectId + " unavailable");
}
- Entity entity = batch.getEntityResult(0).getEntity();
- return getPropertyMap(entity).get("timestamp").getTimestampMicrosecondsValue();
+ Entity entity = batch.getEntityResults(0).getEntity();
+ return entity.getProperties().get("timestamp").getTimestampValue().getNanos();
}
private Datastore getDatastore(PipelineOptions pipelineOptions) {
DatastoreOptions.Builder builder =
- new DatastoreOptions.Builder().host(host).dataset(datasetId).initializer(
- new RetryHttpRequestInitializer());
+ new DatastoreOptions.Builder()
+ .projectId(projectId)
+ .initializer(
+ new RetryHttpRequestInitializer()
+ );
Credential credential = pipelineOptions.as(GcpOptions.class).getGcpCredential();
if (credential != null) {
@@ -518,7 +483,7 @@ public class DatastoreIO {
/** For testing only. */
Source withMockSplitter(QuerySplitter splitter) {
- Source res = new Source(host, datasetId, query, namespace);
+ Source res = new Source(projectId, query, namespace);
res.mockSplitter = splitter;
res.mockEstimateSizeBytes = mockEstimateSizeBytes;
return res;
@@ -526,7 +491,7 @@ public class DatastoreIO {
/** For testing only. */
Source withMockEstimateSizeBytes(Long estimateSizeBytes) {
- Source res = new Source(host, datasetId, query, namespace);
+ Source res = new Source(projectId, query, namespace);
res.mockSplitter = mockSplitter;
res.mockEstimateSizeBytes = estimateSizeBytes;
return res;
@@ -536,23 +501,23 @@ public class DatastoreIO {
///////////////////// Write Class /////////////////////////////////
/**
- * Returns a new {@link DatastoreIO.Sink} builder using the default host.
- * You need to further configure it using {@link DatastoreIO.Sink#withDataset}, and optionally
- * {@link DatastoreIO.Sink#withHost} before using it in a {@link Write} transform.
+ * Returns a new {@link DatastoreIO.Sink} builder.
+ * You need to further configure it using {@link DatastoreIO.Sink#withProject}, before using it
+ * in a {@link Write} transform.
*
- * <p>For example: {@code p.apply(Write.to(DatastoreIO.sink().withDataset(dataset)));}
+ * <p>For example: {@code p.apply(Write.to(DatastoreIO.sink().withProject(projectId)));}
*/
public static Sink sink() {
- return new Sink(DEFAULT_HOST, null);
+ return new Sink(null);
}
/**
* Returns a new {@link Write} transform that will write to a {@link Sink}.
*
- * <p>For example: {@code p.apply(DatastoreIO.writeTo(dataset));}
+ * <p>For example: {@code p.apply(DatastoreIO.writeTo(projectId));}
*/
- public static Write.Bound<Entity> writeTo(String datasetId) {
- return Write.to(sink().withDataset(datasetId));
+ public static Write.Bound<Entity> writeTo(String projectId) {
+ return Write.to(sink().withProject(projectId));
}
/**
@@ -561,44 +526,31 @@ public class DatastoreIO {
*
*/
public static class Sink extends org.apache.beam.sdk.io.Sink<Entity> {
- final String host;
- final String datasetId;
+ final String projectId;
/**
- * Returns a {@link Sink} that is like this one, but will write to the specified dataset.
+ * Returns a {@link Sink} that is like this one, but will write to the specified project.
*/
- public Sink withDataset(String datasetId) {
- checkNotNull(datasetId, "datasetId");
- return new Sink(host, datasetId);
+ public Sink withProject(String projectId) {
+ checkNotNull(projectId, "projectId");
+ return new Sink(projectId);
}
/**
- * Returns a {@link Sink} that is like this one, but will use the given host. If not specified,
- * the {@link DatastoreIO#DEFAULT_HOST default host} will be used.
+ * Constructs a Sink with the given project.
*/
- public Sink withHost(String host) {
- checkNotNull(host, "host");
- return new Sink(host, datasetId);
+ protected Sink(String projectId) {
+ this.projectId = projectId;
}
/**
- * Constructs a Sink with given host and dataset.
- */
- protected Sink(String host, String datasetId) {
- this.host = checkNotNull(host, "host");
- this.datasetId = datasetId;
- }
-
- /**
- * Ensures the host and dataset are set.
+ * Ensures the project is set.
*/
@Override
public void validate(PipelineOptions options) {
Preconditions.checkNotNull(
- host, "Host is a required parameter. Please use withHost to set the host.");
- Preconditions.checkNotNull(
- datasetId,
- "Dataset ID is a required parameter. Please use withDataset to to set the datasetId.");
+ projectId,
+ "Project ID is a required parameter. Please use withProject to to set the projectId.");
}
@Override
@@ -610,10 +562,8 @@ public class DatastoreIO {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder
- .addIfNotDefault(DisplayData.item("host", host)
- .withLabel("Datastore Service"), DEFAULT_HOST)
- .addIfNotNull(DisplayData.item("dataset", datasetId)
- .withLabel("Output Dataset"));
+ .addIfNotNull(DisplayData.item("project", projectId)
+ .withLabel("Output Project"));
}
}
@@ -655,8 +605,7 @@ public class DatastoreIO {
public DatastoreWriter createWriter(PipelineOptions options) throws Exception {
DatastoreOptions.Builder builder =
new DatastoreOptions.Builder()
- .host(sink.host)
- .dataset(sink.datasetId)
+ .projectId(sink.projectId)
.initializer(new RetryHttpRequestInitializer());
Credential credential = options.as(GcpOptions.class).getGcpCredential();
if (credential != null) {
@@ -716,12 +665,12 @@ public class DatastoreIO {
* has either an id or a name.
*/
static boolean isValidKey(Key key) {
- List<PathElement> elementList = key.getPathElementList();
+ List<PathElement> elementList = key.getPathList();
if (elementList.isEmpty()) {
return false;
}
PathElement lastElement = elementList.get(elementList.size() - 1);
- return (lastElement.hasId() || lastElement.hasName());
+ return (lastElement.getId() != 0 || !lastElement.getName().isEmpty());
}
// Visible for testing
@@ -789,13 +738,13 @@ public class DatastoreIO {
// Batch upsert entities.
try {
CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- commitRequest.getMutationBuilder().addAllUpsert(entities);
+ for (Entity entity: entities) {
+ commitRequest.addMutations(makeUpsert(entity));
+ }
commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
datastore.commit(commitRequest.build());
-
// Break if the commit threw no exception.
break;
-
} catch (DatastoreException exception) {
// Only log the code and message for potentially-transient errors. The entire exception
// will be propagated upon the last retry.
@@ -878,7 +827,8 @@ public class DatastoreIO {
this.source = source;
this.datastore = datastore;
// If the user set a limit on the query, remember it. Otherwise pin to MAX_VALUE.
- userLimit = source.query.hasLimit() ? source.query.getLimit() : Integer.MAX_VALUE;
+ userLimit = source.query.hasLimit()
+ ? source.query.getLimit().getValue() : Integer.MAX_VALUE;
}
@Override
@@ -950,8 +900,8 @@ public class DatastoreIO {
*/
private Iterator<EntityResult> getIteratorAndMoveCursor() throws DatastoreException {
Query.Builder query = source.query.toBuilder().clone();
- query.setLimit(Math.min(userLimit, QUERY_BATCH_LIMIT));
- if (currentBatch != null && currentBatch.hasEndCursor()) {
+ query.setLimit(Int32Value.newBuilder().setValue(Math.min(userLimit, QUERY_BATCH_LIMIT)));
+ if (currentBatch != null && !currentBatch.getEndCursor().isEmpty()) {
query.setStartCursor(currentBatch.getEndCursor());
}
@@ -963,7 +913,7 @@ public class DatastoreIO {
// MORE_RESULTS_AFTER_LIMIT is not implemented yet:
// https://groups.google.com/forum/#!topic/gcd-discuss/iNs6M1jA2Vw, so
// use result count to determine if more results might exist.
- int numFetch = currentBatch.getEntityResultCount();
+ int numFetch = currentBatch.getEntityResultsCount();
if (source.query.hasLimit()) {
verify(userLimit >= numFetch,
"Expected userLimit %s >= numFetch %s, because query limit %s should be <= userLimit",
@@ -982,7 +932,7 @@ public class DatastoreIO {
return null;
}
- return currentBatch.getEntityResultList().iterator();
+ return currentBatch.getEntityResultsList().iterator();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/EntityCoderTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/EntityCoderTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/EntityCoderTest.java
deleted file mode 100644
index 2a22925..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/EntityCoderTest.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.coders;
-
-import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeProperty;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeValue;
-
-import org.apache.beam.sdk.testing.CoderProperties;
-import org.apache.beam.sdk.util.CoderUtils;
-
-import com.google.api.services.datastore.DatastoreV1.Entity;
-
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Test case for {@link EntityCoder}.
- */
-@RunWith(JUnit4.class)
-public class EntityCoderTest {
-
- private static final Coder<Entity> TEST_CODER = EntityCoder.of();
-
- // Presumably if anything works, everything works,
- // as actual serialization is fully delegated to
- // autogenerated code from a well-tested library.
- private static final List<Entity> TEST_VALUES = Arrays.<Entity>asList(
- Entity.newBuilder()
- .setKey(makeKey("TestKind", "emptyEntity"))
- .build(),
- Entity.newBuilder()
- .setKey(makeKey("TestKind", "testSimpleProperties"))
- .addProperty(makeProperty("trueProperty", makeValue(true)))
- .addProperty(makeProperty("falseProperty", makeValue(false)))
- .addProperty(makeProperty("stringProperty", makeValue("hello")))
- .addProperty(makeProperty("integerProperty", makeValue(3)))
- .addProperty(makeProperty("doubleProperty", makeValue(-1.583257)))
- .build(),
- Entity.newBuilder()
- .setKey(makeKey("TestKind", "testNestedEntity"))
- .addProperty(makeProperty("entityProperty",
- makeValue(Entity.newBuilder()
- .addProperty(makeProperty("stringProperty", makeValue("goodbye"))))))
- .build());
-
- @Test
- public void testDecodeEncodeEqual() throws Exception {
- for (Entity value : TEST_VALUES) {
- CoderProperties.coderDecodeEncodeEqual(TEST_CODER, value);
- }
- }
-
- // If this changes, it implies the binary format has changed.
- private static final String EXPECTED_ENCODING_ID = "";
-
- @Test
- public void testEncodingId() throws Exception {
- CoderProperties.coderHasEncodingId(TEST_CODER, EXPECTED_ENCODING_ID);
- }
-
- /**
- * Generated data to check that the wire format has not changed. To regenerate, see
- * {@link org.apache.beam.sdk.coders.PrintBase64Encodings}.
- */
- private static final List<String> TEST_ENCODINGS = Arrays.asList(
- "AAAAGwoZEhcKCFRlc3RLaW5kGgtlbXB0eUVudGl0eQ",
- "AAAAnQoiEiAKCFRlc3RLaW5kGhR0ZXN0U2ltcGxlUHJvcGVydGllcxISCgx0cnVlUHJvcGVydHkiAggBEhMKDWZhbHNl"
- + "UHJvcGVydHkiAggAEhoKDnN0cmluZ1Byb3BlcnR5IgiKAQVoZWxsbxIVCg9pbnRlZ2VyUHJvcGVydHkiAhADEh"
- + "sKDmRvdWJsZVByb3BlcnR5IgkZ8ZvCSgVV-b8",
- "AAAAVAoeEhwKCFRlc3RLaW5kGhB0ZXN0TmVzdGVkRW50aXR5EjIKDmVudGl0eVByb3BlcnR5IiAyHhIcCg5zdHJpbmdQ"
- + "cm9wZXJ0eSIKigEHZ29vZGJ5ZQ");
-
- @Test
- public void testWireFormatEncode() throws Exception {
- CoderProperties.coderEncodesBase64(TEST_CODER, TEST_VALUES, TEST_ENCODINGS);
- }
-
- @Rule
- public ExpectedException thrown = ExpectedException.none();
-
- @Test
- public void encodeNullThrowsCoderException() throws Exception {
- thrown.expect(CoderException.class);
- thrown.expectMessage("cannot encode a null Entity");
-
- CoderUtils.encodeToBase64(TEST_CODER, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
index b41c340..14fe4d8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/coders/protobuf/ProtobufUtilTest.java
@@ -26,7 +26,6 @@ import static org.junit.Assert.assertThat;
import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import com.google.api.services.datastore.DatastoreV1.Entity;
import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages;
import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageA;
import com.google.cloud.dataflow.sdk.coders.Proto2CoderTestMessages.MessageB;
@@ -152,9 +151,9 @@ public class ProtobufUtilTest {
}
@Test
- public void testEntityIsDeterministic() throws NonDeterministicException {
- // Cloud Datastore's Entities can be encoded deterministically.
- verifyDeterministic(ProtoCoder.of(Entity.class));
+ public void testDurationIsDeterministic() throws NonDeterministicException {
+ // Duration can be encoded deterministically.
+ verifyDeterministic(ProtoCoder.of(Duration.class));
}
@Test
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
index 622abb2..2aca190 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DatastoreIOTest.java
@@ -19,7 +19,7 @@ package org.apache.beam.sdk.io;
import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
-import static com.google.api.services.datastore.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1beta3.client.DatastoreHelper.makeKey;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
@@ -47,21 +47,22 @@ import org.apache.beam.sdk.testing.ExpectedLogs;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.TestCredential;
-import com.google.api.services.datastore.DatastoreV1.Entity;
-import com.google.api.services.datastore.DatastoreV1.EntityResult;
-import com.google.api.services.datastore.DatastoreV1.Key;
-import com.google.api.services.datastore.DatastoreV1.KindExpression;
-import com.google.api.services.datastore.DatastoreV1.PartitionId;
-import com.google.api.services.datastore.DatastoreV1.PropertyFilter;
-import com.google.api.services.datastore.DatastoreV1.Query;
-import com.google.api.services.datastore.DatastoreV1.QueryResultBatch;
-import com.google.api.services.datastore.DatastoreV1.RunQueryRequest;
-import com.google.api.services.datastore.DatastoreV1.RunQueryResponse;
-import com.google.api.services.datastore.DatastoreV1.Value;
-import com.google.api.services.datastore.client.Datastore;
-import com.google.api.services.datastore.client.DatastoreHelper;
-import com.google.api.services.datastore.client.QuerySplitter;
import com.google.common.collect.Lists;
+import com.google.datastore.v1beta3.Entity;
+import com.google.datastore.v1beta3.EntityResult;
+import com.google.datastore.v1beta3.Key;
+import com.google.datastore.v1beta3.KindExpression;
+import com.google.datastore.v1beta3.PartitionId;
+import com.google.datastore.v1beta3.PropertyFilter;
+import com.google.datastore.v1beta3.Query;
+import com.google.datastore.v1beta3.QueryResultBatch;
+import com.google.datastore.v1beta3.RunQueryRequest;
+import com.google.datastore.v1beta3.RunQueryResponse;
+import com.google.datastore.v1beta3.Value;
+import com.google.datastore.v1beta3.client.Datastore;
+import com.google.datastore.v1beta3.client.DatastoreHelper;
+import com.google.datastore.v1beta3.client.QuerySplitter;
+import com.google.protobuf.Int32Value;
import org.junit.Before;
import org.junit.Rule;
@@ -84,8 +85,7 @@ import java.util.NoSuchElementException;
*/
@RunWith(JUnit4.class)
public class DatastoreIOTest {
- private static final String HOST = "testHost";
- private static final String DATASET = "testDataset";
+ private static final String PROJECT = "testProject";
private static final String NAMESPACE = "testNamespace";
private static final String KIND = "testKind";
private static final Query QUERY;
@@ -108,7 +108,7 @@ public class DatastoreIOTest {
public void setUp() {
MockitoAnnotations.initMocks(this);
initialSource = DatastoreIO.source()
- .withHost(HOST).withDataset(DATASET).withQuery(QUERY).withNamespace(NAMESPACE);
+ .withProject(PROJECT).withQuery(QUERY).withNamespace(NAMESPACE);
}
/**
@@ -123,10 +123,9 @@ public class DatastoreIOTest {
@Test
public void testBuildSource() throws Exception {
DatastoreIO.Source source = DatastoreIO.source()
- .withHost(HOST).withDataset(DATASET).withQuery(QUERY).withNamespace(NAMESPACE);
+ .withProject(PROJECT).withQuery(QUERY).withNamespace(NAMESPACE);
assertEquals(QUERY, source.getQuery());
- assertEquals(DATASET, source.getDataset());
- assertEquals(HOST, source.getHost());
+ assertEquals(PROJECT, source.getProjectId());
assertEquals(NAMESPACE, source.getNamespace());
}
@@ -136,33 +135,23 @@ public class DatastoreIOTest {
@Test
public void testBuildSourceAlt() throws Exception {
DatastoreIO.Source source = DatastoreIO.source()
- .withDataset(DATASET).withNamespace(NAMESPACE).withQuery(QUERY).withHost(HOST);
+ .withProject(PROJECT).withNamespace(NAMESPACE).withQuery(QUERY);
assertEquals(QUERY, source.getQuery());
- assertEquals(DATASET, source.getDataset());
- assertEquals(HOST, source.getHost());
+ assertEquals(PROJECT, source.getProjectId());
assertEquals(NAMESPACE, source.getNamespace());
}
@Test
- public void testSourceValidationFailsHost() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("host");
-
- DatastoreIO.Source source = initialSource.withHost(null);
- source.validate();
- }
-
- @Test
- public void testSourceValidationFailsDataset() throws Exception {
+ public void testSourceValidationFailsProject() throws Exception {
DatastoreIO.Source source = DatastoreIO.source().withQuery(QUERY);
thrown.expect(NullPointerException.class);
- thrown.expectMessage("dataset");
+ thrown.expectMessage("project");
source.validate();
}
@Test
public void testSourceValidationFailsQuery() throws Exception {
- DatastoreIO.Source source = DatastoreIO.source().withDataset(DATASET);
+ DatastoreIO.Source source = DatastoreIO.source().withProject(PROJECT);
thrown.expect(NullPointerException.class);
thrown.expectMessage("query");
source.validate();
@@ -170,25 +159,25 @@ public class DatastoreIOTest {
@Test
public void testSourceValidationFailsQueryLimitZero() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(0).build();
+ Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit 0");
+ thrown.expectMessage("Invalid query limit 0: must be positive");
DatastoreIO.source().withQuery(invalidLimit);
}
@Test
public void testSourceValidationFailsQueryLimitNegative() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(-5).build();
+ Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit -5");
+ thrown.expectMessage("Invalid query limit -5: must be positive");
DatastoreIO.source().withQuery(invalidLimit);
}
@Test
public void testSourceValidationSucceedsNamespace() throws Exception {
- DatastoreIO.Source source = DatastoreIO.source().withDataset(DATASET).withQuery(QUERY);
+ DatastoreIO.Source source = DatastoreIO.source().withProject(PROJECT).withQuery(QUERY);
/* Should succeed, as a null namespace is fine. */
source.validate();
}
@@ -196,61 +185,49 @@ public class DatastoreIOTest {
@Test
public void testSourceDisplayData() {
DatastoreIO.Source source = DatastoreIO.source()
- .withDataset(DATASET)
+ .withProject(PROJECT)
.withQuery(QUERY)
- .withHost(HOST)
.withNamespace(NAMESPACE);
DisplayData displayData = DisplayData.from(source);
- assertThat(displayData, hasDisplayItem("dataset", DATASET));
+ assertThat(displayData, hasDisplayItem("project", PROJECT));
assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
- assertThat(displayData, hasDisplayItem("host", HOST));
assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
}
@Test
- public void testSinkDoesNotAllowNullHost() throws Exception {
- thrown.expect(NullPointerException.class);
- thrown.expectMessage("host");
-
- DatastoreIO.sink().withDataset(DATASET).withHost(null);
- }
-
- @Test
- public void testSinkDoesNotAllowNullDataset() throws Exception {
+ public void testSinkDoesNotAllowNullProject() throws Exception {
thrown.expect(NullPointerException.class);
- thrown.expectMessage("datasetId");
+ thrown.expectMessage("projectId");
- DatastoreIO.sink().withDataset(null);
+ DatastoreIO.sink().withProject(null);
}
@Test
- public void testSinkValidationFailsWithNoDataset() throws Exception {
+ public void testSinkValidationFailsWithNoProject() throws Exception {
DatastoreIO.Sink sink = DatastoreIO.sink();
thrown.expect(NullPointerException.class);
- thrown.expectMessage("Dataset");
+ thrown.expectMessage("Project");
sink.validate(testPipelineOptions());
}
@Test
- public void testSinkValidationSucceedsWithDataset() throws Exception {
- DatastoreIO.Sink sink = DatastoreIO.sink().withDataset(DATASET);
+ public void testSinkValidationSucceedsWithProject() throws Exception {
+ DatastoreIO.Sink sink = DatastoreIO.sink().withProject(PROJECT);
sink.validate(testPipelineOptions());
}
@Test
public void testSinkDisplayData() {
DatastoreIO.Sink sink = DatastoreIO.sink()
- .withDataset(DATASET)
- .withHost(HOST);
+ .withProject(PROJECT);
DisplayData displayData = DisplayData.from(sink);
- assertThat(displayData, hasDisplayItem("dataset", DATASET));
- assertThat(displayData, hasDisplayItem("host", HOST));
+ assertThat(displayData, hasDisplayItem("project", PROJECT));
}
@Test
@@ -303,7 +280,7 @@ public class DatastoreIOTest {
io.splitIntoBundles(1024, testPipelineOptions());
- PartitionId partition = PartitionId.newBuilder().setNamespace(NAMESPACE).build();
+ PartitionId partition = PartitionId.newBuilder().setNamespaceId(NAMESPACE).build();
verify(splitter).getSplits(eq(QUERY), eq(partition), eq(8), any(Datastore.class));
verifyNoMoreInteractions(splitter);
}
@@ -344,7 +321,7 @@ public class DatastoreIOTest {
@Test
public void testQueryDoesNotSplitWithLimitSet() throws Exception {
// Minimal query with a limit
- Query query = Query.newBuilder().setLimit(5).build();
+ Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(5)).build();
// Mock query splitter, should not be invoked.
QuerySplitter splitter = mock(QuerySplitter.class);
@@ -428,17 +405,14 @@ public class DatastoreIOTest {
*/
@Test
public void testBuildSink() throws Exception {
- DatastoreIO.Sink sink = DatastoreIO.sink().withDataset(DATASET).withHost(HOST);
- assertEquals(HOST, sink.host);
- assertEquals(DATASET, sink.datasetId);
+ DatastoreIO.Sink sink = DatastoreIO.sink().withProject(PROJECT);
+ assertEquals(PROJECT, sink.projectId);
- sink = DatastoreIO.sink().withHost(HOST).withDataset(DATASET);
- assertEquals(HOST, sink.host);
- assertEquals(DATASET, sink.datasetId);
+ sink = DatastoreIO.sink().withProject(PROJECT);
+ assertEquals(PROJECT, sink.projectId);
- sink = DatastoreIO.sink().withDataset(DATASET).withHost(HOST);
- assertEquals(HOST, sink.host);
- assertEquals(DATASET, sink.datasetId);
+ sink = DatastoreIO.sink().withProject(PROJECT);
+ assertEquals(PROJECT, sink.projectId);
}
/**
@@ -446,13 +420,11 @@ public class DatastoreIOTest {
*/
@Test
public void testBuildSinkDefaults() throws Exception {
- DatastoreIO.Sink sink = DatastoreIO.sink().withDataset(DATASET);
- assertEquals(DatastoreIO.DEFAULT_HOST, sink.host);
- assertEquals(DATASET, sink.datasetId);
+ DatastoreIO.Sink sink = DatastoreIO.sink().withProject(PROJECT);
+ assertEquals(PROJECT, sink.projectId);
- sink = DatastoreIO.sink().withDataset(DATASET);
- assertEquals(DatastoreIO.DEFAULT_HOST, sink.host);
- assertEquals(DATASET, sink.datasetId);
+ sink = DatastoreIO.sink().withProject(PROJECT);
+ assertEquals(PROJECT, sink.projectId);
}
/**
@@ -462,33 +434,33 @@ public class DatastoreIOTest {
public void testHasNameOrId() {
Key key;
// Complete with name, no ancestor
- key = DatastoreHelper.makeKey("bird", "finch").build();
+ key = makeKey("bird", "finch").build();
assertTrue(DatastoreWriter.isValidKey(key));
// Complete with id, no ancestor
- key = DatastoreHelper.makeKey("bird", 123).build();
+ key = makeKey("bird", 123).build();
assertTrue(DatastoreWriter.isValidKey(key));
// Incomplete, no ancestor
- key = DatastoreHelper.makeKey("bird").build();
+ key = makeKey("bird").build();
assertFalse(DatastoreWriter.isValidKey(key));
// Complete with name and ancestor
- key = DatastoreHelper.makeKey("bird", "owl").build();
- key = DatastoreHelper.makeKey(key, "bird", "horned").build();
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird", "horned").build();
assertTrue(DatastoreWriter.isValidKey(key));
// Complete with id and ancestor
- key = DatastoreHelper.makeKey("bird", "owl").build();
- key = DatastoreHelper.makeKey(key, "bird", 123).build();
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird", 123).build();
assertTrue(DatastoreWriter.isValidKey(key));
// Incomplete with ancestor
- key = DatastoreHelper.makeKey("bird", "owl").build();
- key = DatastoreHelper.makeKey(key, "bird").build();
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird").build();
assertFalse(DatastoreWriter.isValidKey(key));
- key = DatastoreHelper.makeKey().build();
+ key = makeKey().build();
assertFalse(DatastoreWriter.isValidKey(key));
}
@@ -497,7 +469,7 @@ public class DatastoreIOTest {
*/
@Test
public void testAddEntitiesWithIncompleteKeys() throws Exception {
- Key key = DatastoreHelper.makeKey("bird").build();
+ Key key = makeKey("bird").build();
Entity entity = Entity.newBuilder().setKey(key).build();
DatastoreWriter writer = new DatastoreIO.DatastoreWriter(null, mockDatastore);
@@ -513,9 +485,9 @@ public class DatastoreIOTest {
@Test
public void testAddingEntities() throws Exception {
List<Entity> expected = Lists.newArrayList(
- Entity.newBuilder().setKey(DatastoreHelper.makeKey("bird", "jay").build()).build(),
- Entity.newBuilder().setKey(DatastoreHelper.makeKey("bird", "condor").build()).build(),
- Entity.newBuilder().setKey(DatastoreHelper.makeKey("bird", "robin").build()).build());
+ Entity.newBuilder().setKey(makeKey("bird", "jay").build()).build(),
+ Entity.newBuilder().setKey(makeKey("bird", "condor").build()).build(),
+ Entity.newBuilder().setKey(makeKey("bird", "robin").build()).build());
List<Entity> allEntities = Lists.newArrayList(expected);
Collections.shuffle(allEntities);
@@ -543,7 +515,7 @@ public class DatastoreIOTest {
assertTrue(q.hasLimit());
// The limit should be in the range [1, DATASTORE_QUERY_BATCH_LIMIT]
- int limit = q.getLimit();
+ int limit = q.getLimit().getValue();
assertThat(limit, greaterThanOrEqualTo(1));
assertThat(limit, lessThanOrEqualTo(DATASTORE_QUERY_BATCH_LIMIT));
@@ -559,7 +531,7 @@ public class DatastoreIOTest {
// Fill out the other parameters on the returned result batch.
RunQueryResponse.Builder ret = RunQueryResponse.newBuilder();
ret.getBatchBuilder()
- .addAllEntityResult(entities)
+ .addAllEntityResults(entities)
.setEntityResultType(EntityResult.ResultType.FULL)
.setMoreResults(
limit == DATASTORE_QUERY_BATCH_LIMIT
@@ -572,8 +544,9 @@ public class DatastoreIOTest {
/** Helper function to run a test reading from a limited-result query. */
private void runQueryLimitReadTest(int numEntities) throws Exception {
// An empty query to read entities.
- Query query = Query.newBuilder().setLimit(numEntities).build();
- DatastoreIO.Source source = DatastoreIO.source().withQuery(query).withDataset("mockDataset");
+ Query query = Query.newBuilder().setLimit(
+ Int32Value.newBuilder().setValue(numEntities)).build();
+ DatastoreIO.Source source = DatastoreIO.source().withQuery(query).withProject("mockProject");
// Use mockResponseForQuery to generate results.
when(mockDatastore.runQuery(any(RunQueryRequest.class)))
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/104f4dd2/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
index 6e87e91..b3f8743 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ApiSurfaceTest.java
@@ -79,14 +79,16 @@ public class ApiSurfaceTest {
inPackage("com.google.api.client"),
inPackage("com.google.api.services.bigquery"),
inPackage("com.google.api.services.dataflow"),
- inPackage("com.google.api.services.datastore"),
inPackage("com.google.api.services.pubsub"),
inPackage("com.google.api.services.storage"),
inPackage("com.google.auth"),
inPackage("com.google.bigtable.v1"),
inPackage("com.google.cloud.bigtable.config"),
inPackage("com.google.cloud.bigtable.grpc"),
+ inPackage("com.google.datastore"),
inPackage("com.google.protobuf"),
+ inPackage("com.google.rpc"),
+ inPackage("com.google.type"),
inPackage("com.fasterxml.jackson.annotation"),
inPackage("io.grpc"),
inPackage("org.apache.avro"),