You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2016/09/01 23:52:42 UTC

[2/2] incubator-beam git commit: DatastoreIO SplitQueryFn integration test

DatastoreIO SplitQueryFn integration test


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/446e0f27
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/446e0f27
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/446e0f27

Branch: refs/heads/master
Commit: 446e0f27e3ac83f1baaae4538b1c34fdaff72035
Parents: 268ec11
Author: Vikas Kedigehalli <vi...@google.com>
Authored: Mon Aug 29 13:55:32 2016 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Thu Sep 1 16:52:28 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/io/gcp/datastore/DatastoreV1.java  |  9 +-
 .../sdk/io/gcp/datastore/DatastoreV1Test.java   |  6 +-
 .../sdk/io/gcp/datastore/SplitQueryFnIT.java    | 97 ++++++++++++++++++++
 3 files changed, 107 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/446e0f27/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
index 8456e02..e24bc80 100644
--- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
+++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java
@@ -24,6 +24,7 @@ import static com.google.common.base.Verify.verify;
 import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
 import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
 import static com.google.datastore.v1.QueryResultBatch.MoreResultsType.NOT_FINISHED;
+import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;
 import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
 import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
 import static com.google.datastore.v1.client.DatastoreHelper.makeOrder;
@@ -290,7 +291,7 @@ public class DatastoreV1 {
         throws DatastoreException {
       String ourKind = query.getKind(0).getName();
       long latestTimestamp = queryLatestStatisticsTimestamp(datastore, namespace);
-      LOG.info("Latest stats timestamp : {}", latestTimestamp);
+      LOG.info("Latest stats timestamp for kind {} is {}", ourKind, latestTimestamp);
 
       Query.Builder queryBuilder = Query.newBuilder();
       if (namespace == null) {
@@ -298,8 +299,10 @@ public class DatastoreV1 {
       } else {
         queryBuilder.addKindBuilder().setName("__Stat_Ns_Kind__");
       }
-      queryBuilder.setFilter(makeFilter("kind_name", EQUAL, makeValue(ourKind).build()));
-      queryBuilder.setFilter(makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()));
+
+      queryBuilder.setFilter(makeAndFilter(
+          makeFilter("kind_name", EQUAL, makeValue(ourKind).build()).build(),
+          makeFilter("timestamp", EQUAL, makeValue(latestTimestamp).build()).build()));
 
       RunQueryRequest request = makeRequest(queryBuilder.build(), namespace);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/446e0f27/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
index 138671d..d96c320 100644
--- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.io.gcp.datastore;
 
 import static com.google.datastore.v1.PropertyFilter.Operator.EQUAL;
 import static com.google.datastore.v1.PropertyOrder.Direction.DESCENDING;
+import static com.google.datastore.v1.client.DatastoreHelper.makeAndFilter;
 import static com.google.datastore.v1.client.DatastoreHelper.makeDelete;
 import static com.google.datastore.v1.client.DatastoreHelper.makeFilter;
 import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
@@ -805,8 +806,9 @@ public class DatastoreV1Test {
     } else {
       statQuery.addKindBuilder().setName("__Stat_Ns_Kind__");
     }
-    statQuery.setFilter(makeFilter("kind_name", EQUAL, makeValue(KIND)).build());
-    statQuery.setFilter(makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L)).build());
+    statQuery.setFilter(makeAndFilter(
+        makeFilter("kind_name", EQUAL, makeValue(KIND).build()).build(),
+        makeFilter("timestamp", EQUAL, makeValue(timestamp * 1000000L).build()).build()));
     return statQuery.build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/446e0f27/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
new file mode 100644
index 0000000..72ab7c2
--- /dev/null
+++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/SplitQueryFnIT.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.NUM_QUERY_SPLITS_MIN;
+import static org.junit.Assert.assertEquals;
+
+import com.google.datastore.v1.Query;
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.SplitQueryFn;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1.Read.V1Options;
+import org.apache.beam.sdk.transforms.DoFnTester;
+import org.apache.beam.sdk.values.KV;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link DatastoreV1.Read.SplitQueryFn}.
+ *
+ * <p> It is hard to mock the exact behavior of Cloud Datastore, especially for the statistics
+ * queries. Also the fact that DatastoreIO falls back gracefully when querying statistics fails,
+ * makes it hard to catch these issues in production. This test here ensures we interact with
+ * the Cloud Datastore directly, query the actual stats and verify that the SplitQueryFn generates
+ * the expected number of query splits.
+ *
+ * <p> These tests are brittle as they rely on statistics data in Cloud Datastore. If the data
+ * gets lost or changes then they will begin failing and this test should be disabled.
+ * At the time of writing, the Cloud Datastore has the following statistics,
+ * <ul>
+ *   <li>kind = sort_1G, entity_bytes = 2130000000, count = 10000000
+ *   <li>kind = shakespeare, entity_bytes = 26383451, count = 172948
+ * </ul>
+ */
+@RunWith(JUnit4.class)
+public class SplitQueryFnIT {
+  /**
+   * Tests {@link SplitQueryFn} to generate expected number of splits for a large dataset.
+   */
+  @Test
+  public void testSplitQueryFnWithLargeDataset() throws Exception {
+    String projectId = "apache-beam-testing";
+    String kind = "sort_1G";
+    String namespace = null;
+    // Num splits is computed based on the entity_bytes size of the input_sort_1G kind reported by
+    // Datastore stats.
+    int expectedNumSplits = 32;
+    testSplitQueryFn(projectId, kind, namespace, expectedNumSplits);
+  }
+
+  /**
+   * Tests {@link SplitQueryFn} to fallback to NUM_QUERY_SPLITS_MIN for a small dataset.
+   */
+  @Test
+  public void testSplitQueryFnWithSmallDataset() throws Exception {
+    String projectId = "apache-beam-testing";
+    String kind = "shakespeare";
+    String namespace = null;
+    int expectedNumSplits = NUM_QUERY_SPLITS_MIN;
+    testSplitQueryFn(projectId, kind, namespace, expectedNumSplits);
+  }
+
+  /**
+   * A helper method to test {@link SplitQueryFn} to generate the expected number of splits.
+   */
+  private void testSplitQueryFn(String projectId, String kind, @Nullable String namespace,
+      int expectedNumSplits) throws Exception {
+    Query.Builder query = Query.newBuilder();
+    query.addKindBuilder().setName(kind);
+
+    SplitQueryFn splitQueryFn = new SplitQueryFn(
+        V1Options.from(projectId, query.build(), namespace), 0);
+    DoFnTester<Query, KV<Integer, Query>> doFnTester = DoFnTester.of(splitQueryFn);
+
+    List<KV<Integer, Query>> queries = doFnTester.processBundle(query.build());
+    assertEquals(queries.size(), expectedNumSplits);
+  }
+
+  // TODO (vikasrk): Create datasets under a different namespace and add tests.
+}