You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/07/21 00:40:48 UTC

[GitHub] [beam] chamikaramj commented on a diff in pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

chamikaramj commented on code in PR #22052:
URL: https://github.com/apache/beam/pull/22052#discussion_r926156052


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java:
##########
@@ -477,10 +508,16 @@ private static List<Query> splitQuery(
         @Nullable String namespace,
         Datastore datastore,
         QuerySplitter querySplitter,
-        int numSplits)
+        int numSplits,
+        @Nullable Instant readTime)
         throws DatastoreException {
       // If namespace is set, include it in the split request so splits are calculated accordingly.
-      return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore);
+      PartitionId partitionId = forNamespace(namespace).build();
+      if (readTime != null) {
+        Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+        return querySplitter.getSplits(query, partitionId, numSplits, datastore, readTimeProto);
+      }
+      return querySplitter.getSplits(query, partitionId, numSplits, datastore);

Review Comment:
   Just to confirm, do sub-queries maintain the same read time ?
   
   Also, I wonder if we can implement a better splitter function here by using different read times (but that can be separate).



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java:
##########
@@ -146,782 +158,890 @@ public void setUp() {
     MetricsEnvironment.setProcessWideContainer(container);
   }
 
-  @Test
-  public void testBuildRead() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  @Test
-  public void testBuildReadWithGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withNamespace(NAMESPACE);
-    assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  /** {@link #testBuildRead} but constructed in a different order. */
-  @Test
-  public void testBuildReadAlt() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withQuery(QUERY)
-            .withNamespace(NAMESPACE)
-            .withProjectId(PROJECT_ID)
-            .withLocalhost(LOCALHOST);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-    assertEquals(LOCALHOST, read.getLocalhost());
-  }
-
-  @Test
-  public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withQuery(QUERY);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
-    read.expand(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+  @RunWith(JUnit4.class)
+  public static class SingletonTests extends DatastoreV1Test {
+    @Test
+    public void testBuildRead() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withNamespace(NAMESPACE);
+      assertEquals(QUERY, read.getQuery());
+      assertEquals(PROJECT_ID, read.getProjectId().get());
+      assertEquals(NAMESPACE, read.getNamespace().get());
+    }
 
-    DisplayData displayData = DisplayData.from(read);
+    @Test
+    public void testBuildReadWithReadTime() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withReadTime(TIMESTAMP);

Review Comment:
   Do existing tests fail without setting the new parameter ?
   If so, could customers run into similar issues ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org