You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ch...@apache.org on 2022/07/27 01:16:12 UTC
[beam] branch master updated: [#22051]: Add read_time support to Google Cloud Datastore connector (#22052)
This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new edd915d6956 [#22051]: Add read_time support to Google Cloud Datastore connector (#22052)
edd915d6956 is described below
commit edd915d6956fe6e24ac37dd9255fd20e15e93920
Author: yixiaoshen <yi...@google.com>
AuthorDate: Tue Jul 26 18:16:05 2022 -0700
[#22051]: Add read_time support to Google Cloud Datastore connector (#22052)
* [#22051]: add read_time support in Google Cloud Datastore connector.
* Re-arrange @Nullable annotation position.
---
.../org/apache/beam/gradle/BeamModulePlugin.groovy | 4 +-
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 167 ++-
.../beam/sdk/io/gcp/datastore/DatastoreV1Test.java | 1531 +++++++++++---------
.../beam/sdk/io/gcp/datastore/SplitQueryFnIT.java | 19 +-
.../apache/beam/sdk/io/gcp/datastore/V1ReadIT.java | 58 +-
5 files changed, 1023 insertions(+), 756 deletions(-)
diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
index c7d8a55145b..2e5cf0eccf2 100644
--- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
+++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
@@ -592,7 +592,7 @@ class BeamModulePlugin implements Plugin<Project> {
google_cloud_core_grpc : "com.google.cloud:google-cloud-core-grpc", // google_cloud_platform_libraries_bom sets version
google_cloud_datacatalog_v1beta1 : "com.google.cloud:google-cloud-datacatalog", // google_cloud_platform_libraries_bom sets version
google_cloud_dataflow_java_proto_library_all: "com.google.cloud.dataflow:google-cloud-dataflow-java-proto-library-all:0.5.160304",
- google_cloud_datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:2.2.10",
+ google_cloud_datastore_v1_proto_client : "com.google.cloud.datastore:datastore-v1-proto-client:2.9.0",
google_cloud_firestore : "com.google.cloud:google-cloud-firestore", // google_cloud_platform_libraries_bom sets version
google_cloud_pubsub : "com.google.cloud:google-cloud-pubsub", // google_cloud_platform_libraries_bom sets version
google_cloud_pubsublite : "com.google.cloud:google-cloud-pubsublite", // google_cloud_platform_libraries_bom sets version
@@ -687,7 +687,7 @@ class BeamModulePlugin implements Plugin<Project> {
proto_google_cloud_bigtable_admin_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-admin-v2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_bigtable_v2 : "com.google.api.grpc:proto-google-cloud-bigtable-v2", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_datacatalog_v1beta1 : "com.google.api.grpc:proto-google-cloud-datacatalog-v1beta1", // google_cloud_platform_libraries_bom sets version
- proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1:0.93.10", // google_cloud_platform_libraries_bom sets version
+ proto_google_cloud_datastore_v1 : "com.google.api.grpc:proto-google-cloud-datastore-v1:0.100.0", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_firestore_v1 : "com.google.api.grpc:proto-google-cloud-firestore-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsub_v1 : "com.google.api.grpc:proto-google-cloud-pubsub-v1", // google_cloud_platform_libraries_bom sets version
proto_google_cloud_pubsublite_v1 : "com.google.api.grpc:proto-google-cloud-pubsublite-v1", // google_cloud_platform_libraries_bom sets version
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 9137335943c..06b7c5292fd 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -45,6 +45,7 @@ import com.google.datastore.v1.Mutation;
import com.google.datastore.v1.PartitionId;
import com.google.datastore.v1.Query;
import com.google.datastore.v1.QueryResultBatch;
+import com.google.datastore.v1.ReadOptions;
import com.google.datastore.v1.RunQueryRequest;
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.client.Datastore;
@@ -54,6 +55,8 @@ import com.google.datastore.v1.client.DatastoreHelper;
import com.google.datastore.v1.client.DatastoreOptions;
import com.google.datastore.v1.client.QuerySplitter;
import com.google.protobuf.Int32Value;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.util.Timestamps;
import com.google.rpc.Code;
import java.io.IOException;
import java.io.Serializable;
@@ -320,6 +323,8 @@ public class DatastoreV1 {
public abstract @Nullable String getLocalhost();
+ public abstract @Nullable Instant getReadTime();
+
@Override
public abstract String toString();
@@ -339,6 +344,8 @@ public class DatastoreV1 {
abstract Builder setLocalhost(String localhost);
+ abstract Builder setReadTime(Instant readTime);
+
abstract Read build();
}
@@ -346,10 +353,11 @@ public class DatastoreV1 {
* Computes the number of splits to be performed on the given query by querying the estimated
* size from Cloud Datastore.
*/
- static int getEstimatedNumSplits(Datastore datastore, Query query, @Nullable String namespace) {
+ static int getEstimatedNumSplits(
+ Datastore datastore, Query query, @Nullable String namespace, @Nullable Instant readTime) {
int numSplits;
try {
- long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace);
+ long estimatedSizeBytes = getEstimatedSizeBytes(datastore, query, namespace, readTime);
LOG.info("Estimated size bytes for the query is: {}", estimatedSizeBytes);
numSplits =
(int)
@@ -370,7 +378,8 @@ public class DatastoreV1 {
* table.
*/
private static long queryLatestStatisticsTimestamp(
- Datastore datastore, @Nullable String namespace) throws DatastoreException {
+ Datastore datastore, @Nullable String namespace, @Nullable Instant readTime)
+ throws DatastoreException {
Query.Builder query = Query.newBuilder();
// Note: namespace either being null or empty represents the default namespace, in which
// case we treat it as not provided by the user.
@@ -381,7 +390,7 @@ public class DatastoreV1 {
}
query.addOrder(makeOrder("timestamp", DESCENDING));
query.setLimit(Int32Value.newBuilder().setValue(1));
- RunQueryRequest request = makeRequest(query.build(), namespace);
+ RunQueryRequest request = makeRequest(query.build(), namespace, readTime);
RunQueryResponse response = datastore.runQuery(request);
QueryResultBatch batch = response.getBatch();
@@ -392,10 +401,14 @@ public class DatastoreV1 {
return entity.getProperties().get("timestamp").getTimestampValue().getSeconds() * 1000000;
}
- /** Retrieve latest table statistics for a given kind, namespace, and datastore. */
+ /**
+ * Retrieve latest table statistics for a given kind, namespace, and datastore. If the Read has
+ * readTime specified, the latest statistics at or before readTime is retrieved.
+ */
private static Entity getLatestTableStats(
- String ourKind, @Nullable String namespace, Datastore datastore) throws DatastoreException {
- long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace);
+ String ourKind, @Nullable String namespace, Datastore datastore, @Nullable Instant readTime)
+ throws DatastoreException {
+ long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace, readTime);
LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp);
Query.Builder queryBuilder = Query.newBuilder();
@@ -410,7 +423,7 @@ public class DatastoreV1 {
makeFilter("kind_name", EQUAL, makeValue(ourKind).build()).build(),
makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()).build()));
- RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+ RunQueryRequest request = makeRequest(queryBuilder.build(), namespace, readTime);
long now = System.currentTimeMillis();
RunQueryResponse response = datastore.runQuery(request);
@@ -433,10 +446,11 @@ public class DatastoreV1 {
*
* <p>See https://cloud.google.com/datastore/docs/concepts/stats.
*/
- static long getEstimatedSizeBytes(Datastore datastore, Query query, @Nullable String namespace)
+ static long getEstimatedSizeBytes(
+ Datastore datastore, Query query, @Nullable String namespace, @Nullable Instant readTime)
throws DatastoreException {
String ourKind = query.getKind(0).getName();
- Entity entity = getLatestTableStats(ourKind, namespace, datastore);
+ Entity entity = getLatestTableStats(ourKind, namespace, datastore, readTime);
return entity.getProperties().get("entity_bytes").getIntegerValue();
}
@@ -451,21 +465,38 @@ public class DatastoreV1 {
return partitionBuilder;
}
- /** Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}. */
- static RunQueryRequest makeRequest(Query query, @Nullable String namespace) {
- return RunQueryRequest.newBuilder()
- .setQuery(query)
- .setPartitionId(forNamespace(namespace))
- .build();
+ /**
+ * Builds a {@link RunQueryRequest} from the {@code query} and {@code namespace}, optionally at
+ * the requested {@code readTime}.
+ */
+ static RunQueryRequest makeRequest(
+ Query query, @Nullable String namespace, @Nullable Instant readTime) {
+ RunQueryRequest.Builder request =
+ RunQueryRequest.newBuilder().setQuery(query).setPartitionId(forNamespace(namespace));
+ if (readTime != null) {
+ Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+ request.setReadOptions(ReadOptions.newBuilder().setReadTime(readTimeProto).build());
+ }
+ return request.build();
}
@VisibleForTesting
- /** Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}. */
- static RunQueryRequest makeRequest(GqlQuery gqlQuery, @Nullable String namespace) {
- return RunQueryRequest.newBuilder()
- .setGqlQuery(gqlQuery)
- .setPartitionId(forNamespace(namespace))
- .build();
+ /**
+ * Builds a {@link RunQueryRequest} from the {@code GqlQuery} and {@code namespace}, optionally
+ * at the requested {@code readTime}.
+ */
+ static RunQueryRequest makeRequest(
+ GqlQuery gqlQuery, @Nullable String namespace, @Nullable Instant readTime) {
+ RunQueryRequest.Builder request =
+ RunQueryRequest.newBuilder()
+ .setGqlQuery(gqlQuery)
+ .setPartitionId(forNamespace(namespace));
+ if (readTime != null) {
+ Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+ request.setReadOptions(ReadOptions.newBuilder().setReadTime(readTimeProto).build());
+ }
+
+ return request.build();
}
/**
@@ -477,10 +508,16 @@ public class DatastoreV1 {
@Nullable String namespace,
Datastore datastore,
QuerySplitter querySplitter,
- int numSplits)
+ int numSplits,
+ @Nullable Instant readTime)
throws DatastoreException {
// If namespace is set, include it in the split request so splits are calculated accordingly.
- return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore);
+ PartitionId partitionId = forNamespace(namespace).build();
+ if (readTime != null) {
+ Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+ return querySplitter.getSplits(query, partitionId, numSplits, datastore, readTimeProto);
+ }
+ return querySplitter.getSplits(query, partitionId, numSplits, datastore);
}
/**
@@ -497,11 +534,13 @@ public class DatastoreV1 {
* problem in practice.
*/
@VisibleForTesting
- static Query translateGqlQueryWithLimitCheck(String gql, Datastore datastore, String namespace)
+ static Query translateGqlQueryWithLimitCheck(
+ String gql, Datastore datastore, String namespace, @Nullable Instant readTime)
throws DatastoreException {
String gqlQueryWithZeroLimit = gql + " LIMIT 0";
try {
- Query translatedQuery = translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace);
+ Query translatedQuery =
+ translateGqlQuery(gqlQueryWithZeroLimit, datastore, namespace, readTime);
// Clear the limit that we set.
return translatedQuery.toBuilder().clearLimit().build();
} catch (DatastoreException e) {
@@ -512,7 +551,7 @@ public class DatastoreV1 {
LOG.warn("Failed to translate Gql query '{}': {}", gqlQueryWithZeroLimit, e.getMessage());
LOG.warn("User query might have a limit already set, so trying without zero limit");
// Retry without the zero limit.
- return translateGqlQuery(gql, datastore, namespace);
+ return translateGqlQuery(gql, datastore, namespace, readTime);
} else {
throw e;
}
@@ -520,10 +559,11 @@ public class DatastoreV1 {
}
/** Translates a gql query string to {@link Query}. */
- private static Query translateGqlQuery(String gql, Datastore datastore, String namespace)
+ private static Query translateGqlQuery(
+ String gql, Datastore datastore, String namespace, @Nullable Instant readTime)
throws DatastoreException {
GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
- RunQueryRequest req = makeRequest(gqlQuery, namespace);
+ RunQueryRequest req = makeRequest(gqlQuery, namespace, readTime);
return datastore.runQuery(req).getQuery();
}
@@ -628,6 +668,11 @@ public class DatastoreV1 {
return toBuilder().setLocalhost(localhost).build();
}
+ /** Returns a new {@link DatastoreV1.Read} that reads at the specified {@code readTime}. */
+ public DatastoreV1.Read withReadTime(Instant readTime) {
+ return toBuilder().setReadTime(readTime).build();
+ }
+
/** Returns Number of entities available for reading. */
public long getNumEntities(
PipelineOptions options, String ourKind, @Nullable String namespace) {
@@ -638,7 +683,7 @@ public class DatastoreV1 {
datastoreFactory.getDatastore(
options, v1Options.getProjectId(), v1Options.getLocalhost());
- Entity entity = getLatestTableStats(ourKind, namespace, datastore);
+ Entity entity = getLatestTableStats(ourKind, namespace, datastore, getReadTime());
return entity.getProperties().get("count").getIntegerValue();
} catch (Exception e) {
return -1;
@@ -688,13 +733,13 @@ public class DatastoreV1 {
inputQuery =
input
.apply(Create.ofProvider(getLiteralGqlQuery(), StringUtf8Coder.of()))
- .apply(ParDo.of(new GqlQueryTranslateFn(v1Options)));
+ .apply(ParDo.of(new GqlQueryTranslateFn(v1Options, getReadTime())));
}
return inputQuery
- .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits())))
+ .apply("Split", ParDo.of(new SplitQueryFn(v1Options, getNumQuerySplits(), getReadTime())))
.apply("Reshuffle", Reshuffle.viaRandomKey())
- .apply("Read", ParDo.of(new ReadFn(v1Options)));
+ .apply("Read", ParDo.of(new ReadFn(v1Options, getReadTime())));
}
@Override
@@ -705,7 +750,8 @@ public class DatastoreV1 {
.addIfNotNull(DisplayData.item("projectId", getProjectId()).withLabel("ProjectId"))
.addIfNotNull(DisplayData.item("namespace", getNamespace()).withLabel("Namespace"))
.addIfNotNull(DisplayData.item("query", query).withLabel("Query"))
- .addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery()).withLabel("GqlQuery"));
+ .addIfNotNull(DisplayData.item("gqlQuery", getLiteralGqlQuery()).withLabel("GqlQuery"))
+ .addIfNotNull(DisplayData.item("readTime", getReadTime()).withLabel("ReadTime"));
}
@VisibleForTesting
@@ -764,15 +810,22 @@ public class DatastoreV1 {
/** A DoFn that translates a Cloud Datastore gql query string to {@code Query}. */
static class GqlQueryTranslateFn extends DoFn<String, Query> {
private final V1Options v1Options;
+ private final @Nullable Instant readTime;
private transient Datastore datastore;
private final V1DatastoreFactory datastoreFactory;
GqlQueryTranslateFn(V1Options options) {
- this(options, new V1DatastoreFactory());
+ this(options, null, new V1DatastoreFactory());
}
- GqlQueryTranslateFn(V1Options options, V1DatastoreFactory datastoreFactory) {
+ GqlQueryTranslateFn(V1Options options, @Nullable Instant readTime) {
+ this(options, readTime, new V1DatastoreFactory());
+ }
+
+ GqlQueryTranslateFn(
+ V1Options options, @Nullable Instant readTime, V1DatastoreFactory datastoreFactory) {
this.v1Options = options;
+ this.readTime = readTime;
this.datastoreFactory = datastoreFactory;
}
@@ -788,7 +841,8 @@ public class DatastoreV1 {
String gqlQuery = c.element();
LOG.info("User query: '{}'", gqlQuery);
Query query =
- translateGqlQueryWithLimitCheck(gqlQuery, datastore, v1Options.getNamespace());
+ translateGqlQueryWithLimitCheck(
+ gqlQuery, datastore, v1Options.getNamespace(), readTime);
LOG.info("User gql query translated to Query({})", query);
c.output(query);
}
@@ -803,6 +857,8 @@ public class DatastoreV1 {
private final V1Options options;
// number of splits to make for a given query
private final int numSplits;
+ // time from which to run the queries
+ private final @Nullable Instant readTime;
private final V1DatastoreFactory datastoreFactory;
// Datastore client
@@ -811,14 +867,23 @@ public class DatastoreV1 {
private transient QuerySplitter querySplitter;
public SplitQueryFn(V1Options options, int numSplits) {
- this(options, numSplits, new V1DatastoreFactory());
+ this(options, numSplits, null, new V1DatastoreFactory());
+ }
+
+ public SplitQueryFn(V1Options options, int numSplits, @Nullable Instant readTime) {
+ this(options, numSplits, readTime, new V1DatastoreFactory());
}
@VisibleForTesting
- SplitQueryFn(V1Options options, int numSplits, V1DatastoreFactory datastoreFactory) {
+ SplitQueryFn(
+ V1Options options,
+ int numSplits,
+ @Nullable Instant readTime,
+ V1DatastoreFactory datastoreFactory) {
this.options = options;
this.numSplits = numSplits;
this.datastoreFactory = datastoreFactory;
+ this.readTime = readTime;
}
@StartBundle
@@ -842,7 +907,8 @@ public class DatastoreV1 {
int estimatedNumSplits;
// Compute the estimated numSplits if numSplits is not specified by the user.
if (numSplits <= 0) {
- estimatedNumSplits = getEstimatedNumSplits(datastore, query, options.getNamespace());
+ estimatedNumSplits =
+ getEstimatedNumSplits(datastore, query, options.getNamespace(), readTime);
} else {
estimatedNumSplits = numSplits;
}
@@ -852,7 +918,12 @@ public class DatastoreV1 {
try {
querySplits =
splitQuery(
- query, options.getNamespace(), datastore, querySplitter, estimatedNumSplits);
+ query,
+ options.getNamespace(),
+ datastore,
+ querySplitter,
+ estimatedNumSplits,
+ readTime);
} catch (Exception e) {
LOG.warn("Unable to parallelize the given query: {}", query, e);
querySplits = ImmutableList.of(query);
@@ -873,6 +944,7 @@ public class DatastoreV1 {
DisplayData.item("numQuerySplits", numSplits)
.withLabel("Requested number of Query splits"));
}
+ builder.addIfNotNull(DisplayData.item("readTime", readTime).withLabel("ReadTime"));
}
}
@@ -880,6 +952,7 @@ public class DatastoreV1 {
@VisibleForTesting
static class ReadFn extends DoFn<Query, Entity> {
private final V1Options options;
+ private final @Nullable Instant readTime;
private final V1DatastoreFactory datastoreFactory;
// Datastore client
private transient Datastore datastore;
@@ -894,12 +967,17 @@ public class DatastoreV1 {
.withInitialBackoff(Duration.standardSeconds(5));
public ReadFn(V1Options options) {
- this(options, new V1DatastoreFactory());
+ this(options, null, new V1DatastoreFactory());
+ }
+
+ public ReadFn(V1Options options, @Nullable Instant readTime) {
+ this(options, readTime, new V1DatastoreFactory());
}
@VisibleForTesting
- ReadFn(V1Options options, V1DatastoreFactory datastoreFactory) {
+ ReadFn(V1Options options, @Nullable Instant readTime, V1DatastoreFactory datastoreFactory) {
this.options = options;
+ this.readTime = readTime;
this.datastoreFactory = datastoreFactory;
}
@@ -967,7 +1045,7 @@ public class DatastoreV1 {
queryBuilder.setStartCursor(currentBatch.getEndCursor());
}
- RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
+ RunQueryRequest request = makeRequest(queryBuilder.build(), namespace, readTime);
RunQueryResponse response = runQueryWithRetries(request);
currentBatch = response.getBatch();
@@ -1005,6 +1083,7 @@ public class DatastoreV1 {
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.include("options", options);
+ builder.addIfNotNull(DisplayData.item("readTime", readTime).withLabel("ReadTime"));
}
}
}
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 0fc895b0d9e..4aed59c4da3 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -67,8 +67,12 @@ import com.google.datastore.v1.client.Datastore;
import com.google.datastore.v1.client.DatastoreException;
import com.google.datastore.v1.client.QuerySplitter;
import com.google.protobuf.Int32Value;
+import com.google.protobuf.Timestamp;
+import com.google.protobuf.util.Timestamps;
import com.google.rpc.Code;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -101,17 +105,23 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameter;
+import org.junit.runners.Parameterized.Parameters;
+import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
/** Tests for {@link DatastoreV1}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)
public class DatastoreV1Test {
private static final String PROJECT_ID = "testProject";
private static final String NAMESPACE = "testNamespace";
@@ -119,6 +129,8 @@ public class DatastoreV1Test {
private static final Query QUERY;
private static final String LOCALHOST = "localhost:9955";
private static final String GQL_QUERY = "SELECT * from " + KIND;
+ private static final Instant TIMESTAMP = Instant.now();
+ private static final Timestamp TIMESTAMP_PROTO = Timestamps.fromMillis(TIMESTAMP.getMillis());
private static final V1Options V_1_OPTIONS;
static {
@@ -128,9 +140,9 @@ public class DatastoreV1Test {
V_1_OPTIONS = V1Options.from(PROJECT_ID, NAMESPACE, null);
}
- @Mock private Datastore mockDatastore;
- @Mock QuerySplitter mockQuerySplitter;
- @Mock V1DatastoreFactory mockDatastoreFactory;
+ @Mock protected Datastore mockDatastore;
+ @Mock protected QuerySplitter mockQuerySplitter;
+ @Mock protected V1DatastoreFactory mockDatastoreFactory;
@Rule public final ExpectedException thrown = ExpectedException.none();
@@ -146,782 +158,890 @@ public class DatastoreV1Test {
MetricsEnvironment.setProcessWideContainer(container);
}
- @Test
- public void testBuildRead() throws Exception {
- DatastoreV1.Read read =
- DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId().get());
- assertEquals(NAMESPACE, read.getNamespace().get());
- }
-
- @Test
- public void testBuildReadWithGqlQuery() throws Exception {
- DatastoreV1.Read read =
- DatastoreIO.v1()
- .read()
- .withProjectId(PROJECT_ID)
- .withLiteralGqlQuery(GQL_QUERY)
- .withNamespace(NAMESPACE);
- assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
- assertEquals(PROJECT_ID, read.getProjectId().get());
- assertEquals(NAMESPACE, read.getNamespace().get());
- }
-
- /** {@link #testBuildRead} but constructed in a different order. */
- @Test
- public void testBuildReadAlt() throws Exception {
- DatastoreV1.Read read =
- DatastoreIO.v1()
- .read()
- .withQuery(QUERY)
- .withNamespace(NAMESPACE)
- .withProjectId(PROJECT_ID)
- .withLocalhost(LOCALHOST);
- assertEquals(QUERY, read.getQuery());
- assertEquals(PROJECT_ID, read.getProjectId().get());
- assertEquals(NAMESPACE, read.getNamespace().get());
- assertEquals(LOCALHOST, read.getLocalhost());
- }
-
- @Test
- public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
- DatastoreV1.Read read =
- DatastoreIO.v1()
- .read()
- .withProjectId(PROJECT_ID)
- .withLiteralGqlQuery(GQL_QUERY)
- .withQuery(QUERY);
-
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
- read.expand(null);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitZero() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit 0: must be positive");
-
- DatastoreIO.v1().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadValidationFailsQueryLimitNegative() throws Exception {
- Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Invalid query limit -5: must be positive");
-
- DatastoreIO.v1().read().withQuery(invalidLimit);
- }
-
- @Test
- public void testReadDisplayData() {
- DatastoreV1.Read read =
- DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+ @RunWith(JUnit4.class)
+ public static class SingletonTests extends DatastoreV1Test {
+ @Test
+ public void testBuildRead() throws Exception {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withQuery(QUERY)
+ .withNamespace(NAMESPACE);
+ assertEquals(QUERY, read.getQuery());
+ assertEquals(PROJECT_ID, read.getProjectId().get());
+ assertEquals(NAMESPACE, read.getNamespace().get());
+ }
- DisplayData displayData = DisplayData.from(read);
+ @Test
+ public void testBuildReadWithReadTime() throws Exception {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withQuery(QUERY)
+ .withReadTime(TIMESTAMP);
+ assertEquals(TIMESTAMP, read.getReadTime());
+ assertEquals(QUERY, read.getQuery());
+ assertEquals(PROJECT_ID, read.getProjectId().get());
+ }
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
- assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
- }
+ @Test
+ public void testBuildReadWithGqlQuery() throws Exception {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withLiteralGqlQuery(GQL_QUERY)
+ .withNamespace(NAMESPACE);
+ assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
+ assertEquals(PROJECT_ID, read.getProjectId().get());
+ assertEquals(NAMESPACE, read.getNamespace().get());
+ }
- @Test
- public void testReadDisplayDataWithGqlQuery() {
- DatastoreV1.Read read =
- DatastoreIO.v1()
- .read()
- .withProjectId(PROJECT_ID)
- .withLiteralGqlQuery(GQL_QUERY)
- .withNamespace(NAMESPACE);
+ /** {@link #testBuildRead} but constructed in a different order. */
+ @Test
+ public void testBuildReadAlt() throws Exception {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withReadTime(TIMESTAMP)
+ .withQuery(QUERY)
+ .withNamespace(NAMESPACE)
+ .withProjectId(PROJECT_ID)
+ .withLocalhost(LOCALHOST);
+ assertEquals(TIMESTAMP, read.getReadTime());
+ assertEquals(QUERY, read.getQuery());
+ assertEquals(PROJECT_ID, read.getProjectId().get());
+ assertEquals(NAMESPACE, read.getNamespace().get());
+ assertEquals(LOCALHOST, read.getLocalhost());
+ }
- DisplayData displayData = DisplayData.from(read);
+ @Test
+ public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withLiteralGqlQuery(GQL_QUERY)
+ .withQuery(QUERY);
+
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
+ read.expand(null);
+ }
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY));
- assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
- }
+ @Test
+ public void testReadValidationFailsQueryLimitZero() throws Exception {
+ Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid query limit 0: must be positive");
- @Test
- public void testSourcePrimitiveDisplayData() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- int numSplits = 98;
- PTransform<PBegin, PCollection<Entity>> read =
- DatastoreIO.v1()
- .read()
- .withProjectId(PROJECT_ID)
- .withQuery(Query.newBuilder().build())
- .withNumQuerySplits(numSplits);
-
- String assertMessage = "DatastoreIO read should include the '%s' in its primitive display data";
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
- assertThat(
- String.format(assertMessage, "project id"),
- displayData,
- hasItem(hasDisplayItem("projectId", PROJECT_ID)));
- assertThat(
- String.format(assertMessage, "number of query splits"),
- displayData,
- hasItem(hasDisplayItem("numQuerySplits", numSplits)));
- }
+ DatastoreIO.v1().read().withQuery(invalidLimit);
+ }
- @Test
- public void testWriteDisplayData() {
- Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+ @Test
+ public void testReadValidationFailsQueryLimitNegative() throws Exception {
+ Query invalidLimit =
+ Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Invalid query limit -5: must be positive");
- DisplayData displayData = DisplayData.from(write);
+ DatastoreIO.v1().read().withQuery(invalidLimit);
+ }
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
+ @Test
+ public void testReadDisplayData() {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withQuery(QUERY)
+ .withNamespace(NAMESPACE)
+ .withReadTime(TIMESTAMP);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ assertThat(displayData, hasDisplayItem("query", QUERY.toString()));
+ assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+ assertThat(displayData, hasDisplayItem("readTime", TIMESTAMP));
+ }
- @Test
- public void testDeleteEntityDisplayData() {
- DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
+ @Test
+ public void testReadDisplayDataWithGqlQuery() {
+ DatastoreV1.Read read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withLiteralGqlQuery(GQL_QUERY)
+ .withNamespace(NAMESPACE)
+ .withReadTime(TIMESTAMP);
+
+ DisplayData displayData = DisplayData.from(read);
+
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ assertThat(displayData, hasDisplayItem("gqlQuery", GQL_QUERY));
+ assertThat(displayData, hasDisplayItem("namespace", NAMESPACE));
+ assertThat(displayData, hasDisplayItem("readTime", TIMESTAMP));
+ }
- DisplayData displayData = DisplayData.from(deleteEntity);
+ @Test
+ public void testSourcePrimitiveDisplayData() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ int numSplits = 98;
+ PTransform<PBegin, PCollection<Entity>> read =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(PROJECT_ID)
+ .withQuery(Query.newBuilder().build())
+ .withNumQuerySplits(numSplits);
+
+ String assertMessage =
+ "DatastoreIO read should include the '%s' in its primitive display data";
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveSourceTransforms(read);
+ assertThat(
+ String.format(assertMessage, "project id"),
+ displayData,
+ hasItem(hasDisplayItem("projectId", PROJECT_ID)));
+ assertThat(
+ String.format(assertMessage, "number of query splits"),
+ displayData,
+ hasItem(hasDisplayItem("numQuerySplits", numSplits)));
+ }
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
+ @Test
+ public void testWriteDisplayData() {
+ Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
- @Test
- public void testDeleteKeyDisplayData() {
- DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
+ DisplayData displayData = DisplayData.from(write);
- DisplayData displayData = DisplayData.from(deleteKey);
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
+ @Test
+ public void testDeleteEntityDisplayData() {
+ DeleteEntity deleteEntity = DatastoreIO.v1().deleteEntity().withProjectId(PROJECT_ID);
- @Test
- public void testWritePrimitiveDisplayData() {
- int hintNumWorkers = 10;
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Entity>, ?> write =
- DatastoreIO.v1().write().withProjectId("myProject").withHintNumWorkers(hintNumWorkers);
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat(
- "DatastoreIO write should include the project in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("projectId")));
- assertThat(
- "DatastoreIO write should include the upsertFn in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("upsertFn")));
- assertThat(
- "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
- displayData,
- hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
- }
+ DisplayData displayData = DisplayData.from(deleteEntity);
- @Test
- public void testWritePrimitiveDisplayDataDisabledThrottler() {
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Entity>, ?> write =
- DatastoreIO.v1().write().withProjectId("myProject").withRampupThrottlingDisabled();
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat(
- "DatastoreIO write should include the project in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("projectId")));
- assertThat(
- "DatastoreIO write should include the upsertFn in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("upsertFn")));
- assertThat(
- "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
- displayData,
- not(hasItem(hasDisplayItem("hintNumWorkers"))));
- }
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
- @Test
- public void testDeleteEntityPrimitiveDisplayData() {
- int hintNumWorkers = 10;
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Entity>, ?> write =
- DatastoreIO.v1()
- .deleteEntity()
- .withProjectId("myProject")
- .withHintNumWorkers(hintNumWorkers);
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat(
- "DatastoreIO write should include the project in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("projectId")));
- assertThat(
- "DatastoreIO write should include the deleteEntityFn in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("deleteEntityFn")));
- assertThat(
- "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
- displayData,
- hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
- }
+ @Test
+ public void testDeleteKeyDisplayData() {
+ DeleteKey deleteKey = DatastoreIO.v1().deleteKey().withProjectId(PROJECT_ID);
- @Test
- public void testDeleteKeyPrimitiveDisplayData() {
- int hintNumWorkers = 10;
- DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
- PTransform<PCollection<Key>, ?> write =
- DatastoreIO.v1().deleteKey().withProjectId("myProject").withHintNumWorkers(hintNumWorkers);
-
- Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
- assertThat(
- "DatastoreIO write should include the project in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("projectId")));
- assertThat(
- "DatastoreIO write should include the deleteKeyFn in its primitive display data",
- displayData,
- hasItem(hasDisplayItem("deleteKeyFn")));
- assertThat(
- "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
- displayData,
- hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
- }
+ DisplayData displayData = DisplayData.from(deleteKey);
- /** Test building a Write using builder methods. */
- @Test
- public void testBuildWrite() throws Exception {
- DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
- assertEquals(PROJECT_ID, write.getProjectId());
- }
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
+ }
- /** Test the detection of complete and incomplete keys. */
- @Test
- public void testHasNameOrId() {
- Key key;
- // Complete with name, no ancestor
- key = makeKey("bird", "finch").build();
- assertTrue(isValidKey(key));
-
- // Complete with id, no ancestor
- key = makeKey("bird", 123).build();
- assertTrue(isValidKey(key));
-
- // Incomplete, no ancestor
- key = makeKey("bird").build();
- assertFalse(isValidKey(key));
-
- // Complete with name and ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird", "horned").build();
- assertTrue(isValidKey(key));
-
- // Complete with id and ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird", 123).build();
- assertTrue(isValidKey(key));
-
- // Incomplete with ancestor
- key = makeKey("bird", "owl").build();
- key = makeKey(key, "bird").build();
- assertFalse(isValidKey(key));
-
- key = makeKey().build();
- assertFalse(isValidKey(key));
- }
+ @Test
+ public void testWritePrimitiveDisplayData() {
+ int hintNumWorkers = 10;
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Entity>, ?> write =
+ DatastoreIO.v1().write().withProjectId("myProject").withHintNumWorkers(hintNumWorkers);
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat(
+ "DatastoreIO write should include the project in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("projectId")));
+ assertThat(
+ "DatastoreIO write should include the upsertFn in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("upsertFn")));
+ assertThat(
+ "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
+ displayData,
+ hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
+ }
- /** Test that entities with incomplete keys cannot be updated. */
- @Test
- public void testAddEntitiesWithIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- UpsertFn upsertFn = new UpsertFn();
+ @Test
+ public void testWritePrimitiveDisplayDataDisabledThrottler() {
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Entity>, ?> write =
+ DatastoreIO.v1().write().withProjectId("myProject").withRampupThrottlingDisabled();
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat(
+ "DatastoreIO write should include the project in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("projectId")));
+ assertThat(
+ "DatastoreIO write should include the upsertFn in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("upsertFn")));
+ assertThat(
+ "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
+ displayData,
+ not(hasItem(hasDisplayItem("hintNumWorkers"))));
+ }
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Entities to be written to the Cloud Datastore must have complete keys");
+ @Test
+ public void testDeleteEntityPrimitiveDisplayData() {
+ int hintNumWorkers = 10;
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Entity>, ?> write =
+ DatastoreIO.v1()
+ .deleteEntity()
+ .withProjectId("myProject")
+ .withHintNumWorkers(hintNumWorkers);
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat(
+ "DatastoreIO write should include the project in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("projectId")));
+ assertThat(
+ "DatastoreIO write should include the deleteEntityFn in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("deleteEntityFn")));
+ assertThat(
+ "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
+ displayData,
+ hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
+ }
- upsertFn.apply(entity);
- }
+ @Test
+ public void testDeleteKeyPrimitiveDisplayData() {
+ int hintNumWorkers = 10;
+ DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
+ PTransform<PCollection<Key>, ?> write =
+ DatastoreIO.v1()
+ .deleteKey()
+ .withProjectId("myProject")
+ .withHintNumWorkers(hintNumWorkers);
+
+ Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
+ assertThat(
+ "DatastoreIO write should include the project in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("projectId")));
+ assertThat(
+ "DatastoreIO write should include the deleteKeyFn in its primitive display data",
+ displayData,
+ hasItem(hasDisplayItem("deleteKeyFn")));
+ assertThat(
+ "DatastoreIO write should include ramp-up throttling worker count hint if enabled",
+ displayData,
+ hasItem(hasDisplayItem("hintNumWorkers", hintNumWorkers)));
+ }
- @Test
- /** Test that entities with valid keys are transformed to upsert mutations. */
- public void testAddEntities() throws Exception {
- Key key = makeKey("bird", "finch").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- UpsertFn upsertFn = new UpsertFn();
+ /** Test building a Write using builder methods. */
+ @Test
+ public void testBuildWrite() throws Exception {
+ DatastoreV1.Write write = DatastoreIO.v1().write().withProjectId(PROJECT_ID);
+ assertEquals(PROJECT_ID, write.getProjectId());
+ }
- Mutation expectedMutation = makeUpsert(entity).build();
- assertEquals(expectedMutation, upsertFn.apply(entity));
- }
+ /** Test the detection of complete and incomplete keys. */
+ @Test
+ public void testHasNameOrId() {
+ Key key;
+ // Complete with name, no ancestor
+ key = makeKey("bird", "finch").build();
+ assertTrue(isValidKey(key));
+
+ // Complete with id, no ancestor
+ key = makeKey("bird", 123).build();
+ assertTrue(isValidKey(key));
+
+ // Incomplete, no ancestor
+ key = makeKey("bird").build();
+ assertFalse(isValidKey(key));
+
+ // Complete with name and ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird", "horned").build();
+ assertTrue(isValidKey(key));
+
+ // Complete with id and ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird", 123).build();
+ assertTrue(isValidKey(key));
+
+ // Incomplete with ancestor
+ key = makeKey("bird", "owl").build();
+ key = makeKey(key, "bird").build();
+ assertFalse(isValidKey(key));
+
+ key = makeKey().build();
+ assertFalse(isValidKey(key));
+ }
- /** Test that entities with incomplete keys cannot be deleted. */
- @Test
- public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+ /** Test that entities with incomplete keys cannot be updated. */
+ @Test
+ public void testAddEntitiesWithIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ UpsertFn upsertFn = new UpsertFn();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Entities to be deleted from the Cloud Datastore must have complete keys");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Entities to be written to the Cloud Datastore must have complete keys");
- deleteEntityFn.apply(entity);
- }
+ upsertFn.apply(entity);
+ }
- /** Test that entities with valid keys are transformed to delete mutations. */
- @Test
- public void testDeleteEntities() throws Exception {
- Key key = makeKey("bird", "finch").build();
- Entity entity = Entity.newBuilder().setKey(key).build();
- DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
+ @Test
+ /** Test that entities with valid keys are transformed to upsert mutations. */
+ public void testAddEntities() throws Exception {
+ Key key = makeKey("bird", "finch").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ UpsertFn upsertFn = new UpsertFn();
- Mutation expectedMutation = makeDelete(entity.getKey()).build();
- assertEquals(expectedMutation, deleteEntityFn.apply(entity));
- }
+ Mutation expectedMutation = makeUpsert(entity).build();
+ assertEquals(expectedMutation, upsertFn.apply(entity));
+ }
- /** Test that incomplete keys cannot be deleted. */
- @Test
- public void testDeleteIncompleteKeys() throws Exception {
- Key key = makeKey("bird").build();
- DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+ /** Test that entities with incomplete keys cannot be deleted. */
+ @Test
+ public void testDeleteEntitiesWithIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
- thrown.expect(IllegalArgumentException.class);
- thrown.expectMessage("Keys to be deleted from the Cloud Datastore must be complete");
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage(
+ "Entities to be deleted from the Cloud Datastore must have complete keys");
- deleteKeyFn.apply(key);
- }
+ deleteEntityFn.apply(entity);
+ }
- /** Test that valid keys are transformed to delete mutations. */
- @Test
- public void testDeleteKeys() {
- Key key = makeKey("bird", "finch").build();
- DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
+ /** Test that entities with valid keys are transformed to delete mutations. */
+ @Test
+ public void testDeleteEntities() throws Exception {
+ Key key = makeKey("bird", "finch").build();
+ Entity entity = Entity.newBuilder().setKey(key).build();
+ DeleteEntityFn deleteEntityFn = new DeleteEntityFn();
- Mutation expectedMutation = makeDelete(key).build();
- assertEquals(expectedMutation, deleteKeyFn.apply(key));
- }
+ Mutation expectedMutation = makeDelete(entity.getKey()).build();
+ assertEquals(expectedMutation, deleteEntityFn.apply(entity));
+ }
- @Test
- public void testDatastoreWriteFnDisplayData() {
- DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null);
- DisplayData displayData = DisplayData.from(datastoreWriter);
- assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
- }
+ /** Test that incomplete keys cannot be deleted. */
+ @Test
+ public void testDeleteIncompleteKeys() throws Exception {
+ Key key = makeKey("bird").build();
+ DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
- /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
- @Test
- public void testDatatoreWriterFnWithOneBatch() throws Exception {
- datastoreWriterFnTest(100);
- verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
- }
+ thrown.expect(IllegalArgumentException.class);
+ thrown.expectMessage("Keys to be deleted from the Cloud Datastore must be complete");
- /** Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple. */
- @Test
- public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
- datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
- verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 5);
- }
+ deleteKeyFn.apply(key);
+ }
- /**
- * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
- * write batch size.
- */
- @Test
- public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
- datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
- verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
- }
+ /** Test that valid keys are transformed to delete mutations. */
+ @Test
+ public void testDeleteKeys() {
+ Key key = makeKey("bird", "finch").build();
+ DeleteKeyFn deleteKeyFn = new DeleteKeyFn();
- // A helper method to test DatastoreWriterFn for various batch sizes.
- private void datastoreWriterFnTest(int numMutations) throws Exception {
- // Create the requested number of mutations.
- List<Mutation> mutations = new ArrayList<>(numMutations);
- for (int i = 0; i < numMutations; ++i) {
- mutations.add(
- makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+ Mutation expectedMutation = makeDelete(key).build();
+ assertEquals(expectedMutation, deleteKeyFn.apply(key));
}
- DatastoreWriterFn datastoreWriter =
- new DatastoreWriterFn(
- StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
- DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- doFnTester.processBundle(mutations);
-
- int start = 0;
- while (start < numMutations) {
- int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START);
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- commitRequest.addAllMutations(mutations.subList(start, end));
- // Verify all the batch requests were made with the expected mutations.
- verify(mockDatastore, times(1)).commit(commitRequest.build());
- start = end;
+ @Test
+ public void testDatastoreWriteFnDisplayData() {
+ DatastoreWriterFn datastoreWriter = new DatastoreWriterFn(PROJECT_ID, null);
+ DisplayData displayData = DisplayData.from(datastoreWriter);
+ assertThat(displayData, hasDisplayItem("projectId", PROJECT_ID));
}
- }
- /**
- * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
- */
- @Test
- public void testDatatoreWriterFnWithLargeEntities() throws Exception {
- List<Mutation> mutations = new ArrayList<>();
- int entitySize = 0;
- for (int i = 0; i < 12; ++i) {
- Entity entity =
- Entity.newBuilder()
- .setKey(makeKey("key" + i, i + 1))
- .putProperties(
- "long",
- makeValue(new String(new char[900_000])).setExcludeFromIndexes(true).build())
- .build();
- entitySize = entity.getSerializedSize(); // Take the size of any one entity.
- mutations.add(makeUpsert(entity).build());
+ /** Tests {@link DatastoreWriterFn} with entities less than one batch. */
+ @Test
+ public void testDatatoreWriterFnWithOneBatch() throws Exception {
+ datastoreWriterFnTest(100);
+ verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
}
- DatastoreWriterFn datastoreWriter =
- new DatastoreWriterFn(
- StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
- DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- doFnTester.processBundle(mutations);
-
- // This test is over-specific currently; it requires that we split the 12 entity writes into 3
- // requests, but we only need each CommitRequest to be less than 10MB in size.
- int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize;
- int start = 0;
- while (start < mutations.size()) {
- int end = Math.min(mutations.size(), start + entitiesPerRpc);
- CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
- commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
- commitRequest.addAllMutations(mutations.subList(start, end));
- // Verify all the batch requests were made with the expected mutations.
- verify(mockDatastore).commit(commitRequest.build());
- start = end;
+ /**
+ * Tests {@link DatastoreWriterFn} with entities of more than one batches, but not a multiple.
+ */
+ @Test
+ public void testDatatoreWriterFnWithMultipleBatches() throws Exception {
+ datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 3 + 100);
+ verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 5);
}
- }
- /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */
- @Test
- public void testDatatoreWriterFnRetriesErrors() throws Exception {
- List<Mutation> mutations = new ArrayList<>();
- int numRpcs = 2;
- for (int i = 0; i < DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) {
- mutations.add(
- makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+ /**
+ * Tests {@link DatastoreWriterFn} with entities of several batches, using an exact multiple of
+ * write batch size.
+ */
+ @Test
+ public void testDatatoreWriterFnWithBatchesExactMultiple() throws Exception {
+ datastoreWriterFnTest(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * 2);
+ verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
}
- CommitResponse successfulCommit = CommitResponse.getDefaultInstance();
- when(mockDatastore.commit(any(CommitRequest.class)))
- .thenReturn(successfulCommit)
- .thenThrow(new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null))
- .thenReturn(successfulCommit);
-
- DatastoreWriterFn datastoreWriter =
- new DatastoreWriterFn(
- StaticValueProvider.of(PROJECT_ID), null, mockDatastoreFactory, new FakeWriteBatcher());
- DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- doFnTester.processBundle(mutations);
- verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
- verifyMetricWasSet("BatchDatastoreWrite", "unknown", "", 1);
- }
-
- /**
- * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
- * query.
- */
- @Test
- public void testEstimatedSizeBytes() throws Exception {
- long entityBytes = 100L;
- // In seconds
- long timestamp = 1234L;
-
- RunQueryRequest latestTimestampRequest =
- makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE);
- RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
- // Per Kind statistics request and response
- RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
- RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
- when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
- when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
-
- assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE));
- verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
- verify(mockDatastore, times(1)).runQuery(statRequest);
- }
-
- /** Tests {@link SplitQueryFn} when number of query splits is specified. */
- @Test
- public void testSplitQueryFnWithNumSplits() throws Exception {
- int numSplits = 100;
- when(mockQuerySplitter.getSplits(
- eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
- .thenReturn(splitQuery(QUERY, numSplits));
+ // A helper method to test DatastoreWriterFn for various batch sizes.
+ private void datastoreWriterFnTest(int numMutations) throws Exception {
+ // Create the requested number of mutations.
+ List<Mutation> mutations = new ArrayList<>(numMutations);
+ for (int i = 0; i < numMutations; ++i) {
+ mutations.add(
+ makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+ }
+
+ DatastoreWriterFn datastoreWriter =
+ new DatastoreWriterFn(
+ StaticValueProvider.of(PROJECT_ID),
+ null,
+ mockDatastoreFactory,
+ new FakeWriteBatcher());
+ DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(mutations);
+
+ int start = 0;
+ while (start < numMutations) {
+ int end = Math.min(numMutations, start + DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START);
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ commitRequest.addAllMutations(mutations.subList(start, end));
+ // Verify all the batch requests were made with the expected mutations.
+ verify(mockDatastore, times(1)).commit(commitRequest.build());
+ start = end;
+ }
+ }
- SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
- DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
/**
- * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected through
- * mock factory using a when clause for unit testing purposes, it is not serializable because it
- * doesn't have a no-arg constructor. Thus disabling the cloning to prevent the doFn from being
- * serialized.
+ * Tests {@link DatastoreWriterFn} with large entities that need to be split into more batches.
*/
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<Query> queries = doFnTester.processBundle(QUERY);
-
- assertEquals(queries.size(), numSplits);
+ @Test
+ public void testDatatoreWriterFnWithLargeEntities() throws Exception {
+ List<Mutation> mutations = new ArrayList<>();
+ int entitySize = 0;
+ for (int i = 0; i < 12; ++i) {
+ Entity entity =
+ Entity.newBuilder()
+ .setKey(makeKey("key" + i, i + 1))
+ .putProperties(
+ "long",
+ makeValue(new String(new char[900_000])).setExcludeFromIndexes(true).build())
+ .build();
+ entitySize = entity.getSerializedSize(); // Take the size of any one entity.
+ mutations.add(makeUpsert(entity).build());
+ }
+
+ DatastoreWriterFn datastoreWriter =
+ new DatastoreWriterFn(
+ StaticValueProvider.of(PROJECT_ID),
+ null,
+ mockDatastoreFactory,
+ new FakeWriteBatcher());
+ DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(mutations);
+
+ // This test is over-specific currently; it requires that we split the 12 entity writes into 3
+ // requests, but we only need each CommitRequest to be less than 10MB in size.
+ int entitiesPerRpc = DATASTORE_BATCH_UPDATE_BYTES_LIMIT / entitySize;
+ int start = 0;
+ while (start < mutations.size()) {
+ int end = Math.min(mutations.size(), start + entitiesPerRpc);
+ CommitRequest.Builder commitRequest = CommitRequest.newBuilder();
+ commitRequest.setMode(CommitRequest.Mode.NON_TRANSACTIONAL);
+ commitRequest.addAllMutations(mutations.subList(start, end));
+ // Verify all the batch requests were made with the expected mutations.
+ verify(mockDatastore).commit(commitRequest.build());
+ start = end;
+ }
+ }
- // Confirms that sub-queries are not equal to original when there is more than one split.
- for (Query subQuery : queries) {
- assertNotEquals(subQuery, QUERY);
+ /** Tests {@link DatastoreWriterFn} with a failed request which is retried. */
+ @Test
+ public void testDatatoreWriterFnRetriesErrors() throws Exception {
+ List<Mutation> mutations = new ArrayList<>();
+ int numRpcs = 2;
+ for (int i = 0; i < DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START * numRpcs; ++i) {
+ mutations.add(
+ makeUpsert(Entity.newBuilder().setKey(makeKey("key" + i, i + 1)).build()).build());
+ }
+
+ CommitResponse successfulCommit = CommitResponse.getDefaultInstance();
+ when(mockDatastore.commit(any(CommitRequest.class)))
+ .thenReturn(successfulCommit)
+ .thenThrow(new DatastoreException("commit", Code.DEADLINE_EXCEEDED, "", null))
+ .thenReturn(successfulCommit);
+
+ DatastoreWriterFn datastoreWriter =
+ new DatastoreWriterFn(
+ StaticValueProvider.of(PROJECT_ID),
+ null,
+ mockDatastoreFactory,
+ new FakeWriteBatcher());
+ DoFnTester<Mutation, Void> doFnTester = DoFnTester.of(datastoreWriter);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(mutations);
+ verifyMetricWasSet("BatchDatastoreWrite", "ok", "", 2);
+ verifyMetricWasSet("BatchDatastoreWrite", "unknown", "", 1);
}
- verify(mockQuerySplitter, times(1))
- .getSplits(eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
- verifyZeroInteractions(mockDatastore);
- }
+ /** Test options. * */
+ public interface RuntimeTestOptions extends PipelineOptions {
+ ValueProvider<String> getDatastoreProject();
- /** Tests {@link SplitQueryFn} when no query splits is specified. */
- @Test
- public void testSplitQueryFnWithoutNumSplits() throws Exception {
- // Force SplitQueryFn to compute the number of query splits
- int numSplits = 0;
- int expectedNumSplits = 20;
- long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
- // In seconds
- long timestamp = 1234L;
-
- RunQueryRequest latestTimestampRequest =
- makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE);
- RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
-
- // Per Kind statistics request and response
- RunQueryRequest statRequest = makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE);
- RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
-
- when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
- when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
- when(mockQuerySplitter.getSplits(
- eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
- .thenReturn(splitQuery(QUERY, expectedNumSplits));
-
- SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, numSplits, mockDatastoreFactory);
- DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<Query> queries = doFnTester.processBundle(QUERY);
+ void setDatastoreProject(ValueProvider<String> value);
- assertEquals(expectedNumSplits, queries.size());
- verify(mockQuerySplitter, times(1))
- .getSplits(eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
- verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
- verify(mockDatastore, times(1)).runQuery(statRequest);
- }
+ ValueProvider<String> getGqlQuery();
- /** Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit. */
- @Test
- public void testSplitQueryFnWithQueryLimit() throws Exception {
- Query queryWithLimit = QUERY.toBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
+ void setGqlQuery(ValueProvider<String> value);
- SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, mockDatastoreFactory);
- DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- List<Query> queries = doFnTester.processBundle(queryWithLimit);
+ ValueProvider<String> getNamespace();
- assertEquals(1, queries.size());
- verifyNoMoreInteractions(mockDatastore);
- verifyNoMoreInteractions(mockQuerySplitter);
- }
+ void setNamespace(ValueProvider<String> value);
+ }
- /** Tests {@link ReadFn} with a query limit less than one batch. */
- @Test
- public void testReadFnWithOneBatch() throws Exception {
- readFnTest(5);
- verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1);
- }
+ /**
+ * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction
+ * time when built with {@link DatastoreV1.Read#withQuery(Query)}.
+ */
+ @Test
+ public void testRuntimeOptionsNotCalledInApplyQuery() {
+ RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+ Pipeline pipeline = TestPipeline.create(options);
+ pipeline
+ .apply(
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(options.getDatastoreProject())
+ .withQuery(QUERY)
+ .withNamespace(options.getNamespace()))
+ .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
+ }
- /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
- @Test
- public void testReadFnWithMultipleBatches() throws Exception {
- readFnTest(QUERY_BATCH_LIMIT + 5);
- verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 2);
- }
+ /**
+ * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction
+ * time when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}.
+ */
+ @Test
+ public void testRuntimeOptionsNotCalledInApplyGqlQuery() {
+ RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
+ Pipeline pipeline = TestPipeline.create(options);
+ pipeline
+ .apply(
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(options.getDatastoreProject())
+ .withLiteralGqlQuery(options.getGqlQuery()))
+ .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
+ }
- /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
- @Test
- public void testReadFnWithBatchesExactMultiple() throws Exception {
- readFnTest(5 * QUERY_BATCH_LIMIT);
- verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 5);
- }
+ @Test
+ public void testWriteBatcherWithoutData() {
+ DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+ writeBatcher.start();
+ assertEquals(
+ DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0));
+ }
- /** Tests that {@link ReadFn} retries after an error. */
- @Test
- public void testReadFnRetriesErrors() throws Exception {
- // An empty query to read entities.
- Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
+ @Test
+ public void testWriteBatcherFastQueries() {
+ DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+ writeBatcher.start();
+ writeBatcher.addRequestLatency(0, 1000, 200);
+ writeBatcher.addRequestLatency(0, 1000, 200);
+ assertEquals(
+ DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0));
+ }
- // Use mockResponseForQuery to generate results.
- when(mockDatastore.runQuery(any(RunQueryRequest.class)))
- .thenThrow(new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null))
- .thenAnswer(
- invocationOnMock -> {
- Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
- return mockResponseForQuery(q);
- });
+ @Test
+ public void testWriteBatcherSlowQueries() {
+ DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+ writeBatcher.start();
+ writeBatcher.addRequestLatency(0, 10000, 200);
+ writeBatcher.addRequestLatency(0, 10000, 200);
+ assertEquals(120, writeBatcher.nextBatchSize(0));
+ }
- ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
- DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
- doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
- doFnTester.processBundle(query);
- verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1);
- verifyMetricWasSet("BatchDatastoreRead", "unknown", NAMESPACE, 1);
- }
+ @Test
+ public void testWriteBatcherSizeNotBelowMinimum() {
+ DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+ writeBatcher.start();
+ writeBatcher.addRequestLatency(0, 75000, 50);
+ writeBatcher.addRequestLatency(0, 75000, 50);
+ assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0));
+ }
- @Test
- public void testTranslateGqlQueryWithLimit() throws Exception {
- String gql = "SELECT * from DummyKind LIMIT 10";
- String gqlWithZeroLimit = gql + " LIMIT 0";
- GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
- GqlQuery gqlQueryWithZeroLimit =
- GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
- RunQueryRequest gqlRequest = makeRequest(gqlQuery, V_1_OPTIONS.getNamespace());
- RunQueryRequest gqlRequestWithZeroLimit =
- makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace());
- when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
- .thenThrow(
- new DatastoreException(
- "runQuery",
- Code.INVALID_ARGUMENT,
- "invalid query",
- // dummy
- new RuntimeException()));
- when(mockDatastore.runQuery(gqlRequest))
- .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
- assertEquals(
- translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), QUERY);
- verify(mockDatastore, times(1)).runQuery(gqlRequest);
- verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
+ @Test
+ public void testWriteBatcherSlidingWindow() {
+ DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
+ writeBatcher.start();
+ writeBatcher.addRequestLatency(0, 30000, 50);
+ writeBatcher.addRequestLatency(50000, 8000, 200);
+ writeBatcher.addRequestLatency(100000, 8000, 200);
+ assertEquals(150, writeBatcher.nextBatchSize(150000));
+ }
}
- @Test
- public void testTranslateGqlQueryWithNoLimit() throws Exception {
- String gql = "SELECT * from DummyKind";
- String gqlWithZeroLimit = gql + " LIMIT 0";
- GqlQuery gqlQueryWithZeroLimit =
- GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
- RunQueryRequest gqlRequestWithZeroLimit =
- makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace());
- when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
- .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
- assertEquals(
- translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace()), QUERY);
- verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
- }
+ @RunWith(Parameterized.class)
+ public static class ParameterizedTests extends DatastoreV1Test {
+ @Parameter(0)
+ public Instant readTime;
- @Test
- public void testTranslateGqlQueryWithException() throws Exception {
- String gql = "SELECT * from DummyKind";
- String gqlWithZeroLimit = gql + " LIMIT 0";
- GqlQuery gqlQueryWithZeroLimit =
- GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
- RunQueryRequest gqlRequestWithZeroLimit =
- makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace());
- when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
- .thenThrow(new RuntimeException("TestException"));
-
- thrown.expect(RuntimeException.class);
- thrown.expectMessage("TestException");
- translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace());
- }
+ @Parameter(1)
+ public Timestamp readTimeProto;
- /** Test options. * */
- public interface RuntimeTestOptions extends PipelineOptions {
- ValueProvider<String> getDatastoreProject();
+ @Parameters(name = "readTime = {0}, readTimeProto = {1}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[] {null, null}, new Object[] {TIMESTAMP, TIMESTAMP_PROTO});
+ }
- void setDatastoreProject(ValueProvider<String> value);
+ /**
+ * Tests {@link DatastoreV1.Read#getEstimatedSizeBytes} to fetch and return estimated size for a
+ * query.
+ */
+ @Test
+ public void testEstimatedSizeBytes() throws Exception {
+ long entityBytes = 100L;
+ // In seconds
+ long timestamp = 1234L;
+
+ RunQueryRequest latestTimestampRequest =
+ makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE, readTime);
+ RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
+ // Per Kind statistics request and response
+ RunQueryRequest statRequest =
+ makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE, readTime);
+ RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+ when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
+ when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
+
+ assertEquals(entityBytes, getEstimatedSizeBytes(mockDatastore, QUERY, NAMESPACE, readTime));
+ verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
+ verify(mockDatastore, times(1)).runQuery(statRequest);
+ }
- ValueProvider<String> getGqlQuery();
+ /** Tests {@link SplitQueryFn} when number of query splits is specified. */
+ @Test
+ public void testSplitQueryFnWithNumSplits() throws Exception {
+ int numSplits = 100;
+
+ when(mockQuerySplitter.getSplits(
+ eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class)))
+ .thenReturn(splitQuery(QUERY, numSplits));
+ when(mockQuerySplitter.getSplits(
+ eq(QUERY),
+ any(PartitionId.class),
+ eq(numSplits),
+ any(Datastore.class),
+ eq(readTimeProto)))
+ .thenReturn(splitQuery(QUERY, numSplits));
+
+ SplitQueryFn splitQueryFn =
+ new SplitQueryFn(V_1_OPTIONS, numSplits, readTime, mockDatastoreFactory);
+ DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
+ /**
+ * Although Datastore client is marked transient in {@link SplitQueryFn}, when injected
+ * through mock factory using a when clause for unit testing purposes, it is not serializable
+ * because it doesn't have a no-arg constructor. Thus disabling the cloning to prevent the
+ * doFn from being serialized.
+ */
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ List<Query> queries = doFnTester.processBundle(QUERY);
+
+ assertEquals(queries.size(), numSplits);
+
+ // Confirms that sub-queries are not equal to original when there is more than one split.
+ for (Query subQuery : queries) {
+ assertNotEquals(subQuery, QUERY);
+ }
+
+ if (readTime == null) {
+ verify(mockQuerySplitter, times(1))
+ .getSplits(eq(QUERY), any(PartitionId.class), eq(numSplits), any(Datastore.class));
+ } else {
+ verify(mockQuerySplitter, times(1))
+ .getSplits(
+ eq(QUERY),
+ any(PartitionId.class),
+ eq(numSplits),
+ any(Datastore.class),
+ eq(readTimeProto));
+ }
+ verifyZeroInteractions(mockDatastore);
+ }
- void setGqlQuery(ValueProvider<String> value);
+ /** Tests {@link SplitQueryFn} when no query splits is specified. */
+ @Test
+ public void testSplitQueryFnWithoutNumSplits() throws Exception {
+ // Force SplitQueryFn to compute the number of query splits
+ int numSplits = 0;
+ int expectedNumSplits = 20;
+ long entityBytes = expectedNumSplits * DEFAULT_BUNDLE_SIZE_BYTES;
+ // In seconds
+ long timestamp = 1234L;
+
+ RunQueryRequest latestTimestampRequest =
+ makeRequest(makeLatestTimestampQuery(NAMESPACE), NAMESPACE, readTime);
+ RunQueryResponse latestTimestampResponse = makeLatestTimestampResponse(timestamp);
+
+ // Per Kind statistics request and response
+ RunQueryRequest statRequest =
+ makeRequest(makeStatKindQuery(NAMESPACE, timestamp), NAMESPACE, readTime);
+ RunQueryResponse statResponse = makeStatKindResponse(entityBytes);
+
+ when(mockDatastore.runQuery(latestTimestampRequest)).thenReturn(latestTimestampResponse);
+ when(mockDatastore.runQuery(statRequest)).thenReturn(statResponse);
+ when(mockQuerySplitter.getSplits(
+ eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class)))
+ .thenReturn(splitQuery(QUERY, expectedNumSplits));
+ when(mockQuerySplitter.getSplits(
+ eq(QUERY),
+ any(PartitionId.class),
+ eq(expectedNumSplits),
+ any(Datastore.class),
+ eq(readTimeProto)))
+ .thenReturn(splitQuery(QUERY, expectedNumSplits));
+
+ SplitQueryFn splitQueryFn =
+ new SplitQueryFn(V_1_OPTIONS, numSplits, readTime, mockDatastoreFactory);
+ DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ List<Query> queries = doFnTester.processBundle(QUERY);
+
+ assertEquals(expectedNumSplits, queries.size());
+ if (readTime == null) {
+ verify(mockQuerySplitter, times(1))
+ .getSplits(
+ eq(QUERY), any(PartitionId.class), eq(expectedNumSplits), any(Datastore.class));
+ } else {
+ verify(mockQuerySplitter, times(1))
+ .getSplits(
+ eq(QUERY),
+ any(PartitionId.class),
+ eq(expectedNumSplits),
+ any(Datastore.class),
+ eq(readTimeProto));
+ }
+ verify(mockDatastore, times(1)).runQuery(latestTimestampRequest);
+ verify(mockDatastore, times(1)).runQuery(statRequest);
+ }
- ValueProvider<String> getNamespace();
+ /** Tests {@link DatastoreV1.Read.SplitQueryFn} when the query has a user specified limit. */
+ @Test
+ public void testSplitQueryFnWithQueryLimit() throws Exception {
+ Query queryWithLimit =
+ QUERY.toBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
- void setNamespace(ValueProvider<String> value);
- }
+ SplitQueryFn splitQueryFn = new SplitQueryFn(V_1_OPTIONS, 10, readTime, mockDatastoreFactory);
+ DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ List<Query> queries = doFnTester.processBundle(queryWithLimit);
- /**
- * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time
- * when built with {@link DatastoreV1.Read#withQuery(Query)}.
- */
- @Test
- public void testRuntimeOptionsNotCalledInApplyQuery() {
- RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
- Pipeline pipeline = TestPipeline.create(options);
- pipeline
- .apply(
- DatastoreIO.v1()
- .read()
- .withProjectId(options.getDatastoreProject())
- .withQuery(QUERY)
- .withNamespace(options.getNamespace()))
- .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
- }
+ assertEquals(1, queries.size());
+ verifyNoMoreInteractions(mockDatastore);
+ verifyNoMoreInteractions(mockQuerySplitter);
+ }
- /**
- * Test to ensure that {@link ValueProvider} values are not accessed at pipeline construction time
- * when built with {@link DatastoreV1.Read#withLiteralGqlQuery(String)}.
- */
- @Test
- public void testRuntimeOptionsNotCalledInApplyGqlQuery() {
- RuntimeTestOptions options = PipelineOptionsFactory.as(RuntimeTestOptions.class);
- Pipeline pipeline = TestPipeline.create(options);
- pipeline
- .apply(
- DatastoreIO.v1()
- .read()
- .withProjectId(options.getDatastoreProject())
- .withLiteralGqlQuery(options.getGqlQuery()))
- .apply(DatastoreIO.v1().write().withProjectId(options.getDatastoreProject()));
- }
+ /** Tests {@link ReadFn} with a query limit less than one batch. */
+ @Test
+ public void testReadFnWithOneBatch() throws Exception {
+ readFnTest(5, readTime);
+ verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1);
+ }
- @Test
- public void testWriteBatcherWithoutData() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_START, writeBatcher.nextBatchSize(0));
- }
+ /** Tests {@link ReadFn} with a query limit more than one batch, and not a multiple. */
+ @Test
+ public void testReadFnWithMultipleBatches() throws Exception {
+ readFnTest(QUERY_BATCH_LIMIT + 5, readTime);
+ verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 2);
+ }
- @Test
- public void testWriteBatcherFastQueries() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- writeBatcher.addRequestLatency(0, 1000, 200);
- writeBatcher.addRequestLatency(0, 1000, 200);
- assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_LIMIT, writeBatcher.nextBatchSize(0));
- }
+ /** Tests {@link ReadFn} for several batches, using an exact multiple of batch size results. */
+ @Test
+ public void testReadFnWithBatchesExactMultiple() throws Exception {
+ readFnTest(5 * QUERY_BATCH_LIMIT, readTime);
+ verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 5);
+ }
- @Test
- public void testWriteBatcherSlowQueries() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- writeBatcher.addRequestLatency(0, 10000, 200);
- writeBatcher.addRequestLatency(0, 10000, 200);
- assertEquals(120, writeBatcher.nextBatchSize(0));
- }
+ /** Tests that {@link ReadFn} retries after an error. */
+ @Test
+ public void testReadFnRetriesErrors() throws Exception {
+ // An empty query to read entities.
+ Query query = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(1)).build();
+
+ // Use mockResponseForQuery to generate results.
+ when(mockDatastore.runQuery(any(RunQueryRequest.class)))
+ .thenThrow(new DatastoreException("RunQuery", Code.DEADLINE_EXCEEDED, "", null))
+ .thenAnswer(
+ invocationOnMock -> {
+ Query q = ((RunQueryRequest) invocationOnMock.getArguments()[0]).getQuery();
+ return mockResponseForQuery(q);
+ });
+
+ ReadFn readFn = new ReadFn(V_1_OPTIONS, readTime, mockDatastoreFactory);
+ DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
+ doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
+ doFnTester.processBundle(query);
+ verifyMetricWasSet("BatchDatastoreRead", "ok", NAMESPACE, 1);
+ verifyMetricWasSet("BatchDatastoreRead", "unknown", NAMESPACE, 1);
+ }
- @Test
- public void testWriteBatcherSizeNotBelowMinimum() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- writeBatcher.addRequestLatency(0, 75000, 50);
- writeBatcher.addRequestLatency(0, 75000, 50);
- assertEquals(DatastoreV1.DATASTORE_BATCH_UPDATE_ENTITIES_MIN, writeBatcher.nextBatchSize(0));
- }
+ @Test
+ public void testTranslateGqlQueryWithLimit() throws Exception {
+ String gql = "SELECT * from DummyKind LIMIT 10";
+ String gqlWithZeroLimit = gql + " LIMIT 0";
+ GqlQuery gqlQuery = GqlQuery.newBuilder().setQueryString(gql).setAllowLiterals(true).build();
+ GqlQuery gqlQueryWithZeroLimit =
+ GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
+
+ RunQueryRequest gqlRequest = makeRequest(gqlQuery, V_1_OPTIONS.getNamespace(), readTime);
+ RunQueryRequest gqlRequestWithZeroLimit =
+ makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime);
+ when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
+ .thenThrow(
+ new DatastoreException(
+ "runQuery",
+ Code.INVALID_ARGUMENT,
+ "invalid query",
+ // dummy
+ new RuntimeException()));
+ when(mockDatastore.runQuery(gqlRequest))
+ .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
+ assertEquals(
+ translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime),
+ QUERY);
+ verify(mockDatastore, times(1)).runQuery(gqlRequest);
+ verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
+ }
+
+ @Test
+ public void testTranslateGqlQueryWithNoLimit() throws Exception {
+ String gql = "SELECT * from DummyKind";
+ String gqlWithZeroLimit = gql + " LIMIT 0";
+ GqlQuery gqlQueryWithZeroLimit =
+ GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
+
+ RunQueryRequest gqlRequestWithZeroLimit =
+ makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime);
+ when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
+ .thenReturn(RunQueryResponse.newBuilder().setQuery(QUERY).build());
+ assertEquals(
+ translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime),
+ QUERY);
+ verify(mockDatastore, times(1)).runQuery(gqlRequestWithZeroLimit);
+ }
- @Test
- public void testWriteBatcherSlidingWindow() {
- DatastoreV1.WriteBatcher writeBatcher = new DatastoreV1.WriteBatcherImpl();
- writeBatcher.start();
- writeBatcher.addRequestLatency(0, 30000, 50);
- writeBatcher.addRequestLatency(50000, 8000, 200);
- writeBatcher.addRequestLatency(100000, 8000, 200);
- assertEquals(150, writeBatcher.nextBatchSize(150000));
+ @Test
+ public void testTranslateGqlQueryWithException() throws Exception {
+ String gql = "SELECT * from DummyKind";
+ String gqlWithZeroLimit = gql + " LIMIT 0";
+ GqlQuery gqlQueryWithZeroLimit =
+ GqlQuery.newBuilder().setQueryString(gqlWithZeroLimit).setAllowLiterals(true).build();
+ RunQueryRequest gqlRequestWithZeroLimit =
+ makeRequest(gqlQueryWithZeroLimit, V_1_OPTIONS.getNamespace(), readTime);
+ when(mockDatastore.runQuery(gqlRequestWithZeroLimit))
+ .thenThrow(new RuntimeException("TestException"));
+
+ thrown.expect(RuntimeException.class);
+ thrown.expectMessage("TestException");
+ translateGqlQueryWithLimitCheck(gql, mockDatastore, V_1_OPTIONS.getNamespace(), readTime);
+ }
}
/** Helper Methods */
@@ -963,7 +1083,7 @@ public class DatastoreV1Test {
}
/** Helper function to run a test reading from a {@link ReadFn}. */
- private void readFnTest(int numEntities) throws Exception {
+ protected void readFnTest(int numEntities, Instant readTime) throws Exception {
// An empty query to read entities.
Query query =
Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(numEntities)).build();
@@ -976,7 +1096,7 @@ public class DatastoreV1Test {
return mockResponseForQuery(q);
});
- ReadFn readFn = new ReadFn(V_1_OPTIONS, mockDatastoreFactory);
+ ReadFn readFn = new ReadFn(V_1_OPTIONS, readTime, mockDatastoreFactory);
DoFnTester<Query, Entity> doFnTester = DoFnTester.of(readFn);
/**
* Although Datastore client is marked transient in {@link ReadFn}, when injected through mock
@@ -987,10 +1107,20 @@ public class DatastoreV1Test {
doFnTester.setCloningBehavior(CloningBehavior.DO_NOT_CLONE);
List<Entity> entities = doFnTester.processBundle(query);
+ ArgumentCaptor<RunQueryRequest> requestCaptor = ArgumentCaptor.forClass(RunQueryRequest.class);
+
int expectedNumCallsToRunQuery = (int) Math.ceil((double) numEntities / QUERY_BATCH_LIMIT);
- verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(any(RunQueryRequest.class));
+ verify(mockDatastore, times(expectedNumCallsToRunQuery)).runQuery(requestCaptor.capture());
// Validate the number of results.
assertEquals(numEntities, entities.size());
+ // Validate read Time.
+ RunQueryRequest request = requestCaptor.getValue();
+ if (readTime != null) {
+ assertEquals(
+ readTime.getMillis(), Timestamps.toMillis(request.getReadOptions().getReadTime()));
+ } else {
+ assertFalse(request.hasReadOptions());
+ }
}
/** Builds a per-kind statistics response with the given entity size. */
@@ -1050,7 +1180,7 @@ public class DatastoreV1Test {
}
/** Generate dummy query splits. */
- private List<Query> splitQuery(Query query, int numSplits) {
+ private static List<Query> splitQuery(Query query, int numSplits) {
List<Query> queries = new ArrayList<>();
int offsetOfOriginal = query.getOffset();
for (int i = 0; i < numSplits; i++) {
@@ -1082,7 +1212,8 @@ public class DatastoreV1Test {
}
}
- private void verifyMetricWasSet(String method, String status, String namespace, long count) {
+ private static void verifyMetricWasSet(
+ String method, String status, String namespace, long count) {
// Verify the metric as reported.
HashMap<String, String> labels = new HashMap<>();
labels.put(MonitoringInfoConstants.Labels.PTRANSFORM, "");
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
index 6d0bd52f26d..ea00821f360 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
@@ -26,6 +26,8 @@ import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options;
import org.apache.beam.sdk.transforms.DoFnTester;
import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -50,6 +52,8 @@ import org.junit.runners.JUnit4;
*/
@RunWith(JUnit4.class)
public class SplitQueryFnIT {
+ private Instant readTime = Instant.now().minus(Duration.standardSeconds(10));
+
/** Tests {@link SplitQueryFn} to generate expected number of splits for a large dataset. */
@Test
public void testSplitQueryFnWithLargeDataset() throws Exception {
@@ -59,7 +63,8 @@ public class SplitQueryFnIT {
// Num splits is computed based on the entity_bytes size of the input_sort_1G kind reported by
// Datastore stats.
int expectedNumSplits = 32;
- testSplitQueryFn(projectId, kind, namespace, expectedNumSplits);
+ testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, null);
+ testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, readTime);
}
/** Tests {@link SplitQueryFn} to fallback to NUM_QUERY_SPLITS_MIN for a small dataset. */
@@ -69,17 +74,23 @@ public class SplitQueryFnIT {
String kind = "shakespeare";
String namespace = null;
int expectedNumSplits = NUM_QUERY_SPLITS_MIN;
- testSplitQueryFn(projectId, kind, namespace, expectedNumSplits);
+ testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, null);
+ testSplitQueryFn(projectId, kind, namespace, expectedNumSplits, readTime);
}
/** A helper method to test {@link SplitQueryFn} to generate the expected number of splits. */
private void testSplitQueryFn(
- String projectId, String kind, @Nullable String namespace, int expectedNumSplits)
+ String projectId,
+ String kind,
+ @Nullable String namespace,
+ int expectedNumSplits,
+ @Nullable Instant readTime)
throws Exception {
Query.Builder query = Query.newBuilder();
query.addKindBuilder().setName(kind);
- SplitQueryFn splitQueryFn = new SplitQueryFn(V1Options.from(projectId, namespace, null), 0);
+ SplitQueryFn splitQueryFn =
+ new SplitQueryFn(V1Options.from(projectId, namespace, null), 0, readTime);
DoFnTester<Query, Query> doFnTester = DoFnTester.of(splitQueryFn);
List<Query> queries = doFnTester.processBundle(query.build());
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
index 55b53b3f0c9..7d6fc577038 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/V1ReadIT.java
@@ -36,6 +36,7 @@ import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Instant;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -48,7 +49,9 @@ public class V1ReadIT {
private V1TestOptions options;
private String project;
private String ancestor;
- private final long numEntities = 1000;
+ private final long numEntitiesBeforeReadTime = 600;
+ private final long totalNumEntities = 1000;
+ private Instant readTime;
@Before
public void setup() throws Exception {
@@ -57,7 +60,15 @@ public class V1ReadIT {
project = TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
ancestor = UUID.randomUUID().toString();
// Create entities and write them to datastore
- writeEntitiesToDatastore(options, project, ancestor, numEntities);
+ writeEntitiesToDatastore(options, project, ancestor, 0, numEntitiesBeforeReadTime);
+
+ Thread.sleep(1000);
+ readTime = Instant.now();
+ Thread.sleep(1000);
+
+ long moreEntitiesToWrite = totalNumEntities - numEntitiesBeforeReadTime;
+ writeEntitiesToDatastore(
+ options, project, ancestor, numEntitiesBeforeReadTime, moreEntitiesToWrite);
}
@After
@@ -77,6 +88,7 @@ public class V1ReadIT {
Query query =
V1TestUtil.makeAncestorKindQuery(options.getKind(), options.getNamespace(), ancestor);
+ // Read entities without readTime.
DatastoreV1.Read read =
DatastoreIO.v1()
.read()
@@ -88,8 +100,23 @@ public class V1ReadIT {
Pipeline p = Pipeline.create(options);
PCollection<Long> count = p.apply(read).apply(Count.globally());
- PAssert.thatSingleton(count).isEqualTo(numEntities);
+ PAssert.thatSingleton(count).isEqualTo(totalNumEntities);
p.run();
+
+ // Read entities with readTime.
+ DatastoreV1.Read snapshotRead =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(project)
+ .withQuery(query)
+ .withNamespace(options.getNamespace())
+ .withReadTime(readTime);
+
+ Pipeline p2 = Pipeline.create(options);
+ PCollection<Long> count2 = p2.apply(snapshotRead).apply(Count.globally());
+
+ PAssert.thatSingleton(count2).isEqualTo(numEntitiesBeforeReadTime);
+ p2.run();
}
@Test
@@ -114,12 +141,13 @@ public class V1ReadIT {
"SELECT * from %s WHERE __key__ HAS ANCESTOR KEY(%s, '%s')",
options.getKind(), options.getKind(), ancestor);
- long expectedNumEntities = numEntities;
+ long expectedNumEntities = totalNumEntities;
if (limit > 0) {
gqlQuery = String.format("%s LIMIT %d", gqlQuery, limit);
expectedNumEntities = limit;
}
+ // Read entities without readTime.
DatastoreV1.Read read =
DatastoreIO.v1()
.read()
@@ -133,18 +161,36 @@ public class V1ReadIT {
PAssert.thatSingleton(count).isEqualTo(expectedNumEntities);
p.run();
+
+ // Read entities with readTime.
+ DatastoreV1.Read snapshotRead =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(project)
+ .withLiteralGqlQuery(gqlQuery)
+ .withNamespace(options.getNamespace())
+ .withReadTime(readTime);
+
+ Pipeline p2 = Pipeline.create(options);
+ PCollection<Long> count2 = p.apply(snapshotRead).apply(Count.globally());
+
+ long expectedNumEntities2 = limit > 0 ? limit : numEntitiesBeforeReadTime;
+ PAssert.thatSingleton(count2).isEqualTo(expectedNumEntities2);
+ p2.run();
}
// Creates entities and write them to datastore
private static void writeEntitiesToDatastore(
- V1TestOptions options, String project, String ancestor, long numEntities) throws Exception {
+ V1TestOptions options, String project, String ancestor, long valueOffset, long numEntities)
+ throws Exception {
Datastore datastore = getDatastore(options, project);
// Write test entities to datastore
V1TestWriter writer = new V1TestWriter(datastore, new UpsertMutationBuilder());
Key ancestorKey = makeAncestorKey(options.getNamespace(), options.getKind(), ancestor);
for (long i = 0; i < numEntities; i++) {
- Entity entity = makeEntity(i, ancestorKey, options.getKind(), options.getNamespace(), 0);
+ Entity entity =
+ makeEntity(valueOffset + i, ancestorKey, options.getKind(), options.getNamespace(), 0);
writer.write(entity);
}
writer.close();