You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2022/06/24 20:31:48 UTC

[GitHub] [beam] yixiaoshen opened a new pull request, #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

yixiaoshen opened a new pull request, #22052:
URL: https://github.com/apache/beam/pull/22052

   Allows DatastoreIO to be configured with a read_time, which will make the Beam workload issue all its Datastore reads using the same read_time and get the consistent read result from Datastore across all its split reads.
   
   addresses #22051
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1165914433

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yixiaoshen commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
yixiaoshen commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1165953047

   R: @pcostell


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yixiaoshen commented on a diff in pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
yixiaoshen commented on code in PR #22052:
URL: https://github.com/apache/beam/pull/22052#discussion_r927903654


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java:
##########
@@ -146,782 +158,890 @@ public void setUp() {
     MetricsEnvironment.setProcessWideContainer(container);
   }
 
-  @Test
-  public void testBuildRead() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  @Test
-  public void testBuildReadWithGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withNamespace(NAMESPACE);
-    assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  /** {@link #testBuildRead} but constructed in a different order. */
-  @Test
-  public void testBuildReadAlt() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withQuery(QUERY)
-            .withNamespace(NAMESPACE)
-            .withProjectId(PROJECT_ID)
-            .withLocalhost(LOCALHOST);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-    assertEquals(LOCALHOST, read.getLocalhost());
-  }
-
-  @Test
-  public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withQuery(QUERY);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
-    read.expand(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+  @RunWith(JUnit4.class)
+  public static class SingletonTests extends DatastoreV1Test {
+    @Test
+    public void testBuildRead() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withNamespace(NAMESPACE);
+      assertEquals(QUERY, read.getQuery());
+      assertEquals(PROJECT_ID, read.getProjectId().get());
+      assertEquals(NAMESPACE, read.getNamespace().get());
+    }
 
-    DisplayData displayData = DisplayData.from(read);
+    @Test
+    public void testBuildReadWithReadTime() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withReadTime(TIMESTAMP);

Review Comment:
   No, the added readTime is optional and does not need to be specified if customers don't care about read consistency across sub-queries. e.g. just above this test (testBuildReadWithReadTime) there's another test (testBuildRead) that builds a RatastoreIO read with exactly the same arguments except that it doesn't specify readTime.
   
   Also in V1ReadIT test, it does a read without specifying read time, and does another read with read time, both can work. So customers existing workload don't need to be changed and it won't break.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1192840439

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1196148662

   Run SQL_Java11 PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1165914434

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
codecov[bot] commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1165934647

   # [Codecov](https://codecov.io/gh/apache/beam/pull/22052?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#22052](https://codecov.io/gh/apache/beam/pull/22052?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (aad8c94) into [master](https://codecov.io/gh/apache/beam/commit/610323d1e6211f528805c3a6f47bcca0968a206d?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (610323d) will **decrease** coverage by `0.00%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@            Coverage Diff             @@
   ##           master   #22052      +/-   ##
   ==========================================
   - Coverage   74.00%   73.99%   -0.01%     
   ==========================================
     Files         703      703              
     Lines       92936    92936              
   ==========================================
   - Hits        68779    68770       -9     
   - Misses      22891    22900       +9     
     Partials     1266     1266              
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | python | `83.57% <ø> (-0.02%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/22052?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [.../apache\_beam/runners/interactive/dataproc/types.py](https://codecov.io/gh/apache/beam/pull/22052/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9pbnRlcmFjdGl2ZS9kYXRhcHJvYy90eXBlcy5weQ==) | `93.10% <0.00%> (-3.45%)` | :arrow_down: |
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/22052/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.12% <0.00%> (-2.44%)` | :arrow_down: |
   | [...n/apache\_beam/ml/gcp/recommendations\_ai\_test\_it.py](https://codecov.io/gh/apache/beam/pull/22052/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vbWwvZ2NwL3JlY29tbWVuZGF0aW9uc19haV90ZXN0X2l0LnB5) | `73.46% <0.00%> (-2.05%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/22052/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.44% <0.00%> (-0.65%)` | :arrow_down: |
   | [sdks/python/apache\_beam/transforms/combiners.py](https://codecov.io/gh/apache/beam/pull/22052/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdHJhbnNmb3Jtcy9jb21iaW5lcnMucHk=) | `93.05% <0.00%> (-0.39%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/common.py](https://codecov.io/gh/apache/beam/pull/22052/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9jb21tb24ucHk=) | `88.59% <0.00%> (-0.13%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/22052/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.54% <0.00%> (+0.12%)` | :arrow_up: |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/beam/pull/22052?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/beam/pull/22052?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [610323d...aad8c94](https://codecov.io/gh/apache/beam/pull/22052?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pcostell commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
pcostell commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1178366421

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on a diff in pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on code in PR #22052:
URL: https://github.com/apache/beam/pull/22052#discussion_r926156052


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java:
##########
@@ -477,10 +508,16 @@ private static List<Query> splitQuery(
         @Nullable String namespace,
         Datastore datastore,
         QuerySplitter querySplitter,
-        int numSplits)
+        int numSplits,
+        @Nullable Instant readTime)
         throws DatastoreException {
       // If namespace is set, include it in the split request so splits are calculated accordingly.
-      return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore);
+      PartitionId partitionId = forNamespace(namespace).build();
+      if (readTime != null) {
+        Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+        return querySplitter.getSplits(query, partitionId, numSplits, datastore, readTimeProto);
+      }
+      return querySplitter.getSplits(query, partitionId, numSplits, datastore);

Review Comment:
   Just to confirm, do sub-queries maintain the same read time ?
   
   Also, I wonder if we can implement a better splitter function here by using different read times (but that can be separate).



##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java:
##########
@@ -146,782 +158,890 @@ public void setUp() {
     MetricsEnvironment.setProcessWideContainer(container);
   }
 
-  @Test
-  public void testBuildRead() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  @Test
-  public void testBuildReadWithGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withNamespace(NAMESPACE);
-    assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  /** {@link #testBuildRead} but constructed in a different order. */
-  @Test
-  public void testBuildReadAlt() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withQuery(QUERY)
-            .withNamespace(NAMESPACE)
-            .withProjectId(PROJECT_ID)
-            .withLocalhost(LOCALHOST);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-    assertEquals(LOCALHOST, read.getLocalhost());
-  }
-
-  @Test
-  public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withQuery(QUERY);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
-    read.expand(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+  @RunWith(JUnit4.class)
+  public static class SingletonTests extends DatastoreV1Test {
+    @Test
+    public void testBuildRead() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withNamespace(NAMESPACE);
+      assertEquals(QUERY, read.getQuery());
+      assertEquals(PROJECT_ID, read.getProjectId().get());
+      assertEquals(NAMESPACE, read.getNamespace().get());
+    }
 
-    DisplayData displayData = DisplayData.from(read);
+    @Test
+    public void testBuildReadWithReadTime() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withReadTime(TIMESTAMP);

Review Comment:
   Do existing tests fail without setting the new parameter ?
   If so, could customers run into similar issues ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] apilloud commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
apilloud commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1197266393

   This broke the Java Dataflow Postcommit:
   https://ci-beam.apache.org/job/beam_PostCommit_Java_DataflowV1/1906/
   https://ci-beam.apache.org/job/beam_PostCommit_Java_DataflowV2/1910/


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yixiaoshen commented on a diff in pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
yixiaoshen commented on code in PR #22052:
URL: https://github.com/apache/beam/pull/22052#discussion_r927903654


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1Test.java:
##########
@@ -146,782 +158,890 @@ public void setUp() {
     MetricsEnvironment.setProcessWideContainer(container);
   }
 
-  @Test
-  public void testBuildRead() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  @Test
-  public void testBuildReadWithGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withNamespace(NAMESPACE);
-    assertEquals(GQL_QUERY, read.getLiteralGqlQuery().get());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-  }
-
-  /** {@link #testBuildRead} but constructed in a different order. */
-  @Test
-  public void testBuildReadAlt() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withQuery(QUERY)
-            .withNamespace(NAMESPACE)
-            .withProjectId(PROJECT_ID)
-            .withLocalhost(LOCALHOST);
-    assertEquals(QUERY, read.getQuery());
-    assertEquals(PROJECT_ID, read.getProjectId().get());
-    assertEquals(NAMESPACE, read.getNamespace().get());
-    assertEquals(LOCALHOST, read.getLocalhost());
-  }
-
-  @Test
-  public void testReadValidationFailsQueryAndGqlQuery() throws Exception {
-    DatastoreV1.Read read =
-        DatastoreIO.v1()
-            .read()
-            .withProjectId(PROJECT_ID)
-            .withLiteralGqlQuery(GQL_QUERY)
-            .withQuery(QUERY);
-
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("withQuery() and withLiteralGqlQuery() are exclusive");
-    read.expand(null);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitZero() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(0)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit 0: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadValidationFailsQueryLimitNegative() throws Exception {
-    Query invalidLimit = Query.newBuilder().setLimit(Int32Value.newBuilder().setValue(-5)).build();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Invalid query limit -5: must be positive");
-
-    DatastoreIO.v1().read().withQuery(invalidLimit);
-  }
-
-  @Test
-  public void testReadDisplayData() {
-    DatastoreV1.Read read =
-        DatastoreIO.v1().read().withProjectId(PROJECT_ID).withQuery(QUERY).withNamespace(NAMESPACE);
+  @RunWith(JUnit4.class)
+  public static class SingletonTests extends DatastoreV1Test {
+    @Test
+    public void testBuildRead() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withNamespace(NAMESPACE);
+      assertEquals(QUERY, read.getQuery());
+      assertEquals(PROJECT_ID, read.getProjectId().get());
+      assertEquals(NAMESPACE, read.getNamespace().get());
+    }
 
-    DisplayData displayData = DisplayData.from(read);
+    @Test
+    public void testBuildReadWithReadTime() throws Exception {
+      DatastoreV1.Read read =
+          DatastoreIO.v1()
+              .read()
+              .withProjectId(PROJECT_ID)
+              .withQuery(QUERY)
+              .withReadTime(TIMESTAMP);

Review Comment:
   No, the added readTime is optional and does not need to be specified if customers don't care about read consistency across sub-queries. e.g. just above this test (testBuildReadWithReadTime) there's another test (testBuildRead) that builds a DatastoreIO read with exactly the same arguments except that it doesn't specify readTime.
   
   Also in V1ReadIT test, it does a read without specifying read time, and does another read with read time, both can work. So customers existing workload don't need to be changed and it won't break.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yixiaoshen commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
yixiaoshen commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1178370264

   R: @chamikaramj


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1165914438

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yixiaoshen commented on a diff in pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
yixiaoshen commented on code in PR #22052:
URL: https://github.com/apache/beam/pull/22052#discussion_r927906163


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DatastoreV1.java:
##########
@@ -477,10 +508,16 @@ private static List<Query> splitQuery(
         @Nullable String namespace,
         Datastore datastore,
         QuerySplitter querySplitter,
-        int numSplits)
+        int numSplits,
+        @Nullable Instant readTime)
         throws DatastoreException {
       // If namespace is set, include it in the split request so splits are calculated accordingly.
-      return querySplitter.getSplits(query, forNamespace(namespace).build(), numSplits, datastore);
+      PartitionId partitionId = forNamespace(namespace).build();
+      if (readTime != null) {
+        Timestamp readTimeProto = Timestamps.fromMillis(readTime.getMillis());
+        return querySplitter.getSplits(query, partitionId, numSplits, datastore, readTimeProto);
+      }
+      return querySplitter.getSplits(query, partitionId, numSplits, datastore);

Review Comment:
   sub-queries all maintain the same read time, in that way we can guarantee a consistent read across the entire database from the beam workload.
   
   Split function queries Datastore using the split points (a special index), having the split function use the same read time as all other queries will make sure the split points are accurate as of that read time, this can impact how well and balanced the beam workload can be partitioned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] yixiaoshen commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
yixiaoshen commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1197522686

   attempting to fix with #22484


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1165914432

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] asf-ci commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
asf-ci commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1165914435

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj merged pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
chamikaramj merged PR #22052:
URL: https://github.com/apache/beam/pull/22052


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1192840550

   Run Java_PVR_Flink_Batch PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] chamikaramj commented on pull request #22052: [#22051]: Add read_time support to Google Cloud Datastore connector

Posted by GitBox <gi...@apache.org>.
chamikaramj commented on PR #22052:
URL: https://github.com/apache/beam/pull/22052#issuecomment-1192840781

   Retest this please


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org