You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/04/14 20:26:16 UTC

[GitHub] [beam] boyuanzz opened a new pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

boyuanzz opened a new pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418
 
 
   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Apex | Dataflow | Flink | Gearpump | Samza | Spark
   --- | --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
   Java | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Apex/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Gearpump/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
   Python | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/) | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
   XLang | --- | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | --- | [![Build Status](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) 
   Portable | --- | [![Build Status](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#issuecomment-614853115
 
 
   Run RAT PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r408473046
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 ##########
 @@ -49,13 +50,22 @@ public OffsetRange currentRestriction() {
 
   @Override
   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8872): Add support for splitting off a fixed amount of work for this restriction
-    // instead of only supporting checkpointing.
-
-    checkState(
-        lastClaimedOffset != null, "Can't checkpoint before any offset was successfully claimed");
-    OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
-    this.range = new OffsetRange(range.getFrom(), lastClaimedOffset + 1);
+    checkState(lastClaimedOffset != null, "Can't split before any offset was successfully claimed");
 
 Review comment:
   We can split before any successfully claimed block by returning `[from, to)` and updating the current range to be `[from, from)`
   
   This makes sense in some cases where we want to handoff all the work to someone else for the active element while this bundle finishes other processing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r408473271
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 ##########
 @@ -49,13 +50,22 @@ public OffsetRange currentRestriction() {
 
   @Override
   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8872): Add support for splitting off a fixed amount of work for this restriction
-    // instead of only supporting checkpointing.
-
-    checkState(
-        lastClaimedOffset != null, "Can't checkpoint before any offset was successfully claimed");
-    OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
-    this.range = new OffsetRange(range.getFrom(), lastClaimedOffset + 1);
+    checkState(lastClaimedOffset != null, "Can't split before any offset was successfully claimed");
+    // No more split should be performed if checkpoint has happened.
+    if (checkpointed) {
+      return null;
+    }
+    Long splitPos =
+        lastClaimedOffset
+            + Math.max(1L, (long) ((range.getTo() - lastClaimedOffset) * fractionOfRemainder));
+    if (splitPos >= range.getTo()) {
+      return null;
+    }
+    if (fractionOfRemainder == 0.0) {
+      checkpointed = true;
 
 Review comment:
   Why do we need `checkpointed`?
   
   Shouldn't the range restriction change so that `to` becomes `lastClaimed` (or `from` if nothing has been claimed)?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409824692
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 ##########
 @@ -47,16 +48,9 @@ public void testTryClaim() throws Exception {
   @Test
   public void testCheckpointUnstarted() throws Exception {
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
-    expected.expect(IllegalStateException.class);
-    tracker.trySplit(0).getResidual();
-  }
-
-  @Test
-  public void testCheckpointOnlyFailedClaim() throws Exception {
-    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
-    assertFalse(tracker.tryClaim(250L));
-    expected.expect(IllegalStateException.class);
-    OffsetRange checkpoint = tracker.trySplit(0).getResidual();
+    SplitResult res = tracker.trySplit(0);
+    assertEquals(new OffsetRange(100, 100), res.getPrimary());
+    assertEquals(new OffsetRange(100, 200), res.getResidual());
 
 Review comment:
   In this test case, the expected primary is [100, 100) and the expected residual is [100, 200)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409832148
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 ##########
 @@ -47,16 +48,9 @@ public void testTryClaim() throws Exception {
   @Test
   public void testCheckpointUnstarted() throws Exception {
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
-    expected.expect(IllegalStateException.class);
-    tracker.trySplit(0).getResidual();
-  }
-
-  @Test
-  public void testCheckpointOnlyFailedClaim() throws Exception {
-    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
-    assertFalse(tracker.tryClaim(250L));
-    expected.expect(IllegalStateException.class);
-    OffsetRange checkpoint = tracker.trySplit(0).getResidual();
+    SplitResult res = tracker.trySplit(0);
+    assertEquals(new OffsetRange(100, 100), res.getPrimary());
+    assertEquals(new OffsetRange(100, 200), res.getResidual());
 
 Review comment:
   Thanks, that was my mistake, I read both lines as being `[100, 200)`

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409818060
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 ##########
 @@ -96,9 +90,24 @@ public void testCheckpointAfterFailedClaim() throws Exception {
     assertTrue(tracker.tryClaim(110L));
     assertTrue(tracker.tryClaim(160L));
     assertFalse(tracker.tryClaim(240L));
-    OffsetRange checkpoint = tracker.trySplit(0).getResidual();
-    assertEquals(new OffsetRange(100, 161), tracker.currentRestriction());
-    assertEquals(new OffsetRange(161, 200), checkpoint);
+    assertNull(tracker.trySplit(0));
+  }
 
 Review comment:
   ```suggestion
       tracker.checkDone();
     }
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409807914
 
 

 ##########
 File path: runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 ##########
 @@ -210,9 +210,10 @@ public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
         // the call says that not the whole restriction has been processed. So we need to take
         // a checkpoint now: checkpoint() guarantees that the primary restriction describes exactly
         // the work that was done in the current ProcessElement call, and returns a residual
-        // restriction that describes exactly the work that wasn't done in the current call.
+        // restriction that describes exactly the work that wasn't done in the current call. The
+        // residual is null when the entire restriction has been processed.
         if (processContext.numClaimedBlocks > 0) {
-          residual = checkNotNull(processContext.takeCheckpointNow());
+          residual = processContext.takeCheckpointNow();
           processContext.tracker.checkDone();
         } else {
 
 Review comment:
   The comments below will likely need updating

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#issuecomment-615447007
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r408483381
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 ##########
 @@ -49,13 +50,22 @@ public OffsetRange currentRestriction() {
 
   @Override
   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8872): Add support for splitting off a fixed amount of work for this restriction
-    // instead of only supporting checkpointing.
-
-    checkState(
-        lastClaimedOffset != null, "Can't checkpoint before any offset was successfully claimed");
-    OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
-    this.range = new OffsetRange(range.getFrom(), lastClaimedOffset + 1);
+    checkState(lastClaimedOffset != null, "Can't split before any offset was successfully claimed");
 
 Review comment:
   Allowing split before first claiming makes sense to me. Python has already allowed that.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409826545
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 ##########
 @@ -19,6 +19,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.io.range.OffsetRange;
 
 Review comment:
   `trySplit` right?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on issue #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on issue #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#issuecomment-614781245
 
 
   Run Java PreCommit

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r408943910
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 ##########
 @@ -49,13 +50,22 @@ public OffsetRange currentRestriction() {
 
   @Override
   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8872): Add support for splitting off a fixed amount of work for this restriction
-    // instead of only supporting checkpointing.
-
-    checkState(
-        lastClaimedOffset != null, "Can't checkpoint before any offset was successfully claimed");
-    OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
-    this.range = new OffsetRange(range.getFrom(), lastClaimedOffset + 1);
+    checkState(lastClaimedOffset != null, "Can't split before any offset was successfully claimed");
+    // No more split should be performed if checkpoint has happened.
+    if (checkpointed) {
+      return null;
+    }
+    Long splitPos =
+        lastClaimedOffset
+            + Math.max(1L, (long) ((range.getTo() - lastClaimedOffset) * fractionOfRemainder));
+    if (splitPos >= range.getTo()) {
+      return null;
+    }
+    if (fractionOfRemainder == 0.0) {
+      checkpointed = true;
 
 Review comment:
   Changing the state makes the code more complicated though since the bounds checking varies based upon whether you got 0.0 or 0.00001 which is likely to produce the same final state with one having an additional boolean being set.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409806021
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 ##########
 @@ -47,16 +48,9 @@ public void testTryClaim() throws Exception {
   @Test
   public void testCheckpointUnstarted() throws Exception {
     OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
-    expected.expect(IllegalStateException.class);
-    tracker.trySplit(0).getResidual();
-  }
-
-  @Test
-  public void testCheckpointOnlyFailedClaim() throws Exception {
-    OffsetRangeTracker tracker = new OffsetRangeTracker(new OffsetRange(100, 200));
-    assertFalse(tracker.tryClaim(250L));
-    expected.expect(IllegalStateException.class);
-    OffsetRange checkpoint = tracker.trySplit(0).getResidual();
+    SplitResult res = tracker.trySplit(0);
+    assertEquals(new OffsetRange(100, 100), res.getPrimary());
+    assertEquals(new OffsetRange(100, 200), res.getResidual());
 
 Review comment:
   primary and residual shouldn't have the same value, primary should be an empty range like `[100, 100)`
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409820103
 
 

 ##########
 File path: runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableProcessElementInvoker.java
 ##########
 @@ -51,11 +51,6 @@ public Result(
         @Nullable WatermarkEstimatorStateT futureWatermarkEstimatorState) {
       checkArgument(continuation != null, "continuation must not be null");
       this.continuation = continuation;
-      if (continuation.shouldResume()) {
-        checkArgument(
-            residualRestriction != null,
-            "residual restriction must not be null if continuation indicate it should resume");
-      }
       this.residualRestriction = residualRestriction;
       this.futureOutputWatermark = futureOutputWatermark;
       this.futureWatermarkEstimatorState = futureWatermarkEstimatorState;
 
 Review comment:
   I believe the comment below could be incorrect. If we get stop(), we shouldn't have a residual restriction.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r408483552
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 ##########
 @@ -49,13 +50,22 @@ public OffsetRange currentRestriction() {
 
   @Override
   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8872): Add support for splitting off a fixed amount of work for this restriction
-    // instead of only supporting checkpointing.
-
-    checkState(
-        lastClaimedOffset != null, "Can't checkpoint before any offset was successfully claimed");
-    OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
-    this.range = new OffsetRange(range.getFrom(), lastClaimedOffset + 1);
+    checkState(lastClaimedOffset != null, "Can't split before any offset was successfully claimed");
+    // No more split should be performed if checkpoint has happened.
+    if (checkpointed) {
+      return null;
+    }
+    Long splitPos =
+        lastClaimedOffset
+            + Math.max(1L, (long) ((range.getTo() - lastClaimedOffset) * fractionOfRemainder));
+    if (splitPos >= range.getTo()) {
+      return null;
+    }
+    if (fractionOfRemainder == 0.0) {
+      checkpointed = true;
 
 Review comment:
   Just return early since we know there is no more split after checkpointing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409819571
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 ##########
 @@ -19,6 +19,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.io.range.OffsetRange;
 
 Review comment:
   Can we add tests to verify tryClaim(0), tryClaim(0.1), tryClaim(1) on an empty range like [100, 100)
   
   Can we also add tests to verify the behavior of tryClaim(1) on range [100, 200)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409826636
 
 

 ##########
 File path: runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 ##########
 @@ -210,9 +210,10 @@ public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
         // the call says that not the whole restriction has been processed. So we need to take
         // a checkpoint now: checkpoint() guarantees that the primary restriction describes exactly
         // the work that was done in the current ProcessElement call, and returns a residual
-        // restriction that describes exactly the work that wasn't done in the current call.
+        // restriction that describes exactly the work that wasn't done in the current call. The
+        // residual is null when the entire restriction has been processed.
         if (processContext.numClaimedBlocks > 0) {
-          residual = checkNotNull(processContext.takeCheckpointNow());
+          residual = processContext.takeCheckpointNow();
 
 Review comment:
   I guess the original assumption is, checkpoint should happen after at least one `tryClaim` called. Since we change the assumption, the `numClaimedBlocks ` can also be removed.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409806889
 
 

 ##########
 File path: runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 ##########
 @@ -210,9 +210,10 @@ public FinishBundleContext finishBundleContext(DoFn<InputT, OutputT> doFn) {
         // the call says that not the whole restriction has been processed. So we need to take
         // a checkpoint now: checkpoint() guarantees that the primary restriction describes exactly
         // the work that was done in the current ProcessElement call, and returns a residual
-        // restriction that describes exactly the work that wasn't done in the current call.
+        // restriction that describes exactly the work that wasn't done in the current call. The
+        // residual is null when the entire restriction has been processed.
         if (processContext.numClaimedBlocks > 0) {
-          residual = checkNotNull(processContext.takeCheckpointNow());
+          residual = processContext.takeCheckpointNow();
 
 Review comment:
   takeCheckpointNow should work regardless whether numClaimedBlocks > 0 or not.
   
   Even if tryClaim never happens, the watermark may advance.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r409844505
 
 

 ##########
 File path: sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTrackerTest.java
 ##########
 @@ -19,6 +19,7 @@
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import org.apache.beam.sdk.io.range.OffsetRange;
 
 Review comment:
   Yes. Your right. Need more sleep.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418#discussion_r408473046
 
 

 ##########
 File path: sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
 ##########
 @@ -49,13 +50,22 @@ public OffsetRange currentRestriction() {
 
   @Override
   public SplitResult<OffsetRange> trySplit(double fractionOfRemainder) {
-    // TODO(BEAM-8872): Add support for splitting off a fixed amount of work for this restriction
-    // instead of only supporting checkpointing.
-
-    checkState(
-        lastClaimedOffset != null, "Can't checkpoint before any offset was successfully claimed");
-    OffsetRange res = new OffsetRange(lastClaimedOffset + 1, range.getTo());
-    this.range = new OffsetRange(range.getFrom(), lastClaimedOffset + 1);
+    checkState(lastClaimedOffset != null, "Can't split before any offset was successfully claimed");
 
 Review comment:
   We can split before any successfully claimed block by returning `[from, to)` and updating the current range to be `[from, from)`
   
   This makes sense in some cases where we want to handoff all the work to someone else for the active element while this Bundle finishes other processing.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [beam] lukecwik merged pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker

Posted by GitBox <gi...@apache.org>.
lukecwik merged pull request #11418: [BEAM-8872] Support split at fraction for OffsetRangeTracker
URL: https://github.com/apache/beam/pull/11418
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services