You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by "tonytanger (via GitHub)" <gi...@apache.org> on 2023/02/09 22:15:26 UTC

[GitHub] [beam] tonytanger opened a new pull request, #25411: Generate initial list of partitions to stream

tonytanger opened a new pull request, #25411:
URL: https://github.com/apache/beam/pull/25411

   Add to DetectNewPartitionDoFn to generate the initial list partitions to output. This only happens when the restriction tracker is at 0 which is the initial value of the restriction tracker.
   
   This PR is based on https://github.com/apache/beam/pull/25364
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: `addresses #123`), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/get-started-contributing/#make-the-reviewers-job-easier).
   
   To check the build health, please visit [https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   [![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go tests](https://github.com/apache/beam/workflows/Go%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem merged pull request #25411: Generate initial list of partitions to stream

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem merged PR #25411:
URL: https://github.com/apache/beam/pull/25411


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #25411: Generate initial list of partitions to stream

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on PR #25411:
URL: https://github.com/apache/beam/pull/25411#issuecomment-1429024188

   failures in precommit are unrelated and we're committing to a feature branch so I'll move ahead and merge (leaving a question open @tonytanger )


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #25411: Generate initial list of partitions to stream

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on code in PR #25411:
URL: https://github.com/apache/beam/pull/25411#discussion_r1105229153


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java:
##########
@@ -47,12 +57,34 @@ public GenerateInitialPartitionsAction(
    * The very first step of the pipeline when there are no partitions being streamed yet. We want to
    * get an initial list of partitions to stream and output them.
    *
-   * @return true if this pipeline should continue, otherwise false.
+   * @return {@link ProcessContinuation#resume()} if the stream continues, otherwise {@link
+   *     ProcessContinuation#stop()}
    */
-  public boolean run(
+  public ProcessContinuation run(
       OutputReceiver<PartitionRecord> receiver,
+      RestrictionTracker<OffsetRange, Long> tracker,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
-      Timestamp startTime) {
-    return true;
+      com.google.cloud.Timestamp startTime) {
+    if (!tracker.tryClaim(0L)) {
+      LOG.error(
+          "Could not claim initial DetectNewPartition restriction. No partitions are outputted.");
+      return ProcessContinuation.stop();
+    }
+    List<ByteStringRange> streamPartitions =
+        changeStreamDao.generateInitialChangeStreamPartitions();
+
+    watermarkEstimator.setWatermark(TimestampConverter.toInstant(startTime));
+
+    for (ByteStringRange partition : streamPartitions) {
+      metrics.incListPartitionsCount();
+      String uid = UniqueIdGenerator.getNextId();
+      PartitionRecord partitionRecord =
+          new PartitionRecord(partition, startTime, uid, startTime, endTime);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);

Review Comment:
   have you tested this? Your comment addresses my main question, but I am curious why you'd need this if you have appropriate watermark information coming from the record itself?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on pull request #25411: Generate initial list of partitions to stream

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on PR #25411:
URL: https://github.com/apache/beam/pull/25411#issuecomment-1428802638

   Run Java PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] pabloem commented on a diff in pull request #25411: Generate initial list of partitions to stream

Posted by "pabloem (via GitHub)" <gi...@apache.org>.
pabloem commented on code in PR #25411:
URL: https://github.com/apache/beam/pull/25411#discussion_r1105229529


##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigtable/changestreams/action/GenerateInitialPartitionsAction.java:
##########
@@ -47,12 +57,34 @@ public GenerateInitialPartitionsAction(
    * The very first step of the pipeline when there are no partitions being streamed yet. We want to
    * get an initial list of partitions to stream and output them.
    *
-   * @return true if this pipeline should continue, otherwise false.
+   * @return {@link ProcessContinuation#resume()} if the stream continues, otherwise {@link
+   *     ProcessContinuation#stop()}
    */
-  public boolean run(
+  public ProcessContinuation run(
       OutputReceiver<PartitionRecord> receiver,
+      RestrictionTracker<OffsetRange, Long> tracker,
       ManualWatermarkEstimator<Instant> watermarkEstimator,
-      Timestamp startTime) {
-    return true;
+      com.google.cloud.Timestamp startTime) {
+    if (!tracker.tryClaim(0L)) {
+      LOG.error(
+          "Could not claim initial DetectNewPartition restriction. No partitions are outputted.");
+      return ProcessContinuation.stop();
+    }
+    List<ByteStringRange> streamPartitions =
+        changeStreamDao.generateInitialChangeStreamPartitions();
+
+    watermarkEstimator.setWatermark(TimestampConverter.toInstant(startTime));
+
+    for (ByteStringRange partition : streamPartitions) {
+      metrics.incListPartitionsCount();
+      String uid = UniqueIdGenerator.getNextId();
+      PartitionRecord partitionRecord =
+          new PartitionRecord(partition, startTime, uid, startTime, endTime);
+      // We are outputting elements with timestamp of 0 to prevent reliance on event time. This
+      // limits the ability to window on commit time of any data changes. It is still possible to
+      // window on processing time.
+      receiver.outputWithTimestamp(partitionRecord, Instant.EPOCH);

Review Comment:
   on the other hand, these are initial partitions so it perhaps does not matter too much what timestamps they bring (as long as individual change records have correct timestamps?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] codecov[bot] commented on pull request #25411: Generate initial list of partitions to stream

Posted by "codecov[bot] (via GitHub)" <gi...@apache.org>.
codecov[bot] commented on PR #25411:
URL: https://github.com/apache/beam/pull/25411#issuecomment-1424939553

   # [Codecov](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#25411](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d5419b1) into [bigtable-cdc-feature-branch](https://codecov.io/gh/apache/beam/commit/d20d0b01c3c6bcde551420f36e13d794c930f1e2?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (d20d0b0) will **decrease** coverage by `0.02%`.
   > The diff coverage is `n/a`.
   
   ```diff
   @@                       Coverage Diff                       @@
   ##           bigtable-cdc-feature-branch   #25411      +/-   ##
   ===============================================================
   - Coverage                        72.96%   72.95%   -0.02%     
   ===============================================================
     Files                              745      745              
     Lines                            99174    99174              
   ===============================================================
   - Hits                             72362    72349      -13     
   - Misses                           25446    25459      +13     
     Partials                          1366     1366              
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [sdks/python/apache\_beam/utils/interactive\_utils.py](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vdXRpbHMvaW50ZXJhY3RpdmVfdXRpbHMucHk=) | `95.12% <0.00%> (-2.44%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/data\_plane.py](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvZGF0YV9wbGFuZS5weQ==) | `87.57% <0.00%> (-1.70%)` | :arrow_down: |
   | [...eam/runners/portability/fn\_api\_runner/execution.py](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9wb3J0YWJpbGl0eS9mbl9hcGlfcnVubmVyL2V4ZWN1dGlvbi5weQ==) | `92.49% <0.00%> (-0.64%)` | :arrow_down: |
   | [sdks/python/apache\_beam/runners/direct/executor.py](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy9kaXJlY3QvZXhlY3V0b3IucHk=) | `96.46% <0.00%> (-0.55%)` | :arrow_down: |
   | [...ks/python/apache\_beam/runners/worker/sdk\_worker.py](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvc2RrX3dvcmtlci5weQ==) | `89.08% <0.00%> (-0.17%)` | :arrow_down: |
   | [...hon/apache\_beam/runners/worker/bundle\_processor.py](https://codecov.io/gh/apache/beam/pull/25411?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-c2Rrcy9weXRob24vYXBhY2hlX2JlYW0vcnVubmVycy93b3JrZXIvYnVuZGxlX3Byb2Nlc3Nvci5weQ==) | `93.67% <0.00%> (+0.12%)` | :arrow_up: |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tonytanger commented on pull request #25411: Generate initial list of partitions to stream

Posted by "tonytanger (via GitHub)" <gi...@apache.org>.
tonytanger commented on PR #25411:
URL: https://github.com/apache/beam/pull/25411#issuecomment-1428772376

   Run Java_GCP_IO_Direct PreCommit


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [beam] tonytanger commented on pull request #25411: Generate initial list of partitions to stream

Posted by "tonytanger (via GitHub)" <gi...@apache.org>.
tonytanger commented on PR #25411:
URL: https://github.com/apache/beam/pull/25411#issuecomment-1428562224

   R: @pabloem 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscribe@beam.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org