You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/01 23:52:42 UTC
[2/2] incubator-beam git commit: DatastoreIO SplitQueryFn integration
test
DatastoreIO SplitQueryFn integration test
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/446e0f27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/446e0f27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/446e0f27
Branch: refs/heads/master
Commit: 446e0f27e3ac83f1baaae4538b1c34fdaff72035
Parents: 268ec11
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Aug 29 13:55:32 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Sep 1 16:52:28 2016 -0700
----------------------------------------------------------------------
.../beam/sdk/io/gcp/datastore/DatastoreV1.java | 9 +-
.../sdk/io/gcp/datastore/DatastoreV1Test.java | 6 +-
.../sdk/io/gcp/datastore/SplitQueryFnIT.java | 97 ++++++++++++++++++++
3 files changed, 107 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/446e0f27/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 8456e02..e24bc80 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Verify.verify;
import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
@@ -290,7 +291,7 @@ public class DatastoreV1 {
throws DatastoreException {
String ourKind = query.getKind(0).getName();
long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace);
- LOG.info("Latest stats timestamp : {}", latestTimestamp);
+ LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp);
Query.Builder queryBuilder = Query.newBuilder();
if (namespace == null) {
@@ -298,8 +299,10 @@ public class DatastoreV1 {
} else {
queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__");
}
- queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build()));
- queryBuilder.setFilter(makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()));
+
+ queryBuilder.setFilter(makeAndFilter(
+ makeFilter("kind_name", EQUAL, makeValue(ourKind).build()).build(),
+ makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()).build()));
RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/446e0f27/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 138671d..d96c320 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.datastore;
import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
@@ -805,8 +806,9 @@ public class DatastoreV1Test {
} else {
statQuery.addKindBuilder().setName("__Stat_Ns_Kind__");
}
- statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
- statQuery.setFilter(makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L)).build());
+ statQuery.setFilter(makeAndFilter(
+ makeFilter("kind_name", EQUAL, makeValue(KIND).build()).build(),
+ makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L).build()).build()));
return statQuery.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/446e0f27/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
new file mode 100644
index 0000000..72ab7c2
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.NUM_QUERY_SPLITS_MIN;
+import static org.junit.Assert.assertEquals;
+
+import com.google.datastore.v1.Query;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link DatastoreV1.Read.SplitQueryFn}.
+ *
+ * <p> It is hard to mock the exact behavior of Cloud Datastore, especially for the statistics
+ * queries. Also the fact that DatastoreIO falls back gracefully when querying statistics fails,
+ * makes it hard to catch these issues in production. This test here ensures we interact with
+ * the Cloud Datastore directly, query the actual stats and verify that the SplitQueryFn generates
+ * the expected number of query splits.
+ *
+ * <p> These tests are brittle as they rely on statistics data in Cloud Datastore. If the data
+ * gets lost or changes then they will begin failing and this test should be disabled.
+ * At the time of writing, the Cloud Datastore has the following statistics,
+ * <ul>
+ * <li>kind = sort_1G, entity_bytes = 2130000000, count = 10000000
+ * <li>kind = shakespeare, entity_bytes = 26383451, count = 172948
+ * </ul>
+ */
+@RunWith(JUnit4.class)
+public class SplitQueryFnIT {
+ /**
+ * Tests {@link SplitQueryFn} to generate expected number of splits for a large dataset.
+ */
+ @Test
+ public void testSplitQueryFnWithLargeDataset() throws Exception {
+ String projectId = "apache-beam-testing";
+ String kind = "sort_1G";
+ String namespace = null;
+ // Num splits is computed based on the entity_bytes size of the input_sort_1G kind reported by
+ // Datastore stats.
+ int expectedNumSplits = 32;
+ testSplitQueryFn(projectId, kind, namespace, expectedNumSplits);
+ }
+
+ /**
+ * Tests {@link SplitQueryFn} to fallback to NUM_QUERY_SPLITS_MIN for a small dataset.
+ */
+ @Test
+ public void testSplitQueryFnWithSmallDataset() throws Exception {
+ String projectId = "apache-beam-testing";
+ String kind = "shakespeare";
+ String namespace = null;
+ int expectedNumSplits = NUM_QUERY_SPLITS_MIN;
+ testSplitQueryFn(projectId, kind, namespace, expectedNumSplits);
+ }
+
+ /**
+ * A helper method to test {@link SplitQueryFn} to generate the expected number of splits.
+ */
+ private void testSplitQueryFn(String projectId, String kind, @Nullable String namespace,
+ int expectedNumSplits) throws Exception {
+ Query.Builder query = Query.newBuilder();
+ query.addKindBuilder().setName(kind);
+
+ SplitQueryFn splitQueryFn = new SplitQueryFn(
+ V1Options.from(projectId, query.build(), namespace), 0);
+ DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
+
+ List<KV<Integer, Query>> queries = doFnTester.processBundle(query.build());
+ assertEquals(queries.size(), expectedNumSplits);
+ }
+
+ // TODO (vikasrk): Create datasets under a different namespace and add tests.
+}