You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/27 23:37:46 UTC

[GitHub] [beam] boyuanzz opened a new pull request #12710: Refactor split logic to reuse common logic.

boyuanzz opened a new pull request #12710:
URL: https://github.com/apache/beam/pull/12710


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace
   --- | --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) <br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/be
 am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   ![Build python source distribution and wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg)
   ![Python tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg)
   ![Java tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] y1chi commented on a change in pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
y1chi commented on a change in pull request #12710:
URL: https://github.com/apache/beam/pull/12710#discussion_r481585635



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1514,6 +1409,19 @@ public Object restriction() {
                   stopWindowIndex,
                   elementSplit,
                   watermarkAndState);
+        } else if (downstreamSplitResult != null) {

Review comment:
       it seems that the whole if-else branch could be simplified to just 
   ```
   windowedSplitResult =
                 computeWindowSplitResult(
                     currentElement,
                     currentRestriction,
                     currentWindow,
                     windows,
                     currentWatermarkEstimatorState,
                     (elementSplit == null && downstreamSplitResult == null) ? currentWindowIndex : newWindowStopIndex,
                     newWindowStopIndex,
                     stopWindowIndex,
                     elementSplit,
                     watermarkAndState);
   ```
   or maybe I missed something.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1546,50 +1459,32 @@ public Object restriction() {
               currentWindowIndex,
               stopWindowIndex,
               stopWindowIndex,
-              splitResult,
+              elementSplitResult,
               watermarkAndState);
     }
-    return KV.of(windowedSplitResult, newWindowStopIndex);
+    return SplitResultsWithStopIndex.of(
+        windowedSplitResult, downstreamSplitResult, newWindowStopIndex);
   }
 
-  private HandlesSplits.SplitResult trySplitForElementAndRestriction(
-      double fractionOfRemainder, Duration resumeDelay) {
-    KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
-    WindowedSplitResult windowedSplitResult = null;
-    synchronized (splitLock) {
-      // There is nothing to split if we are between element and restriction processing calls.
-      if (currentTracker == null) {
-        return null;
-      }
-      // Make sure to get the output watermark before we split to ensure that the lower bound
-      // applies to the residual.
-      watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
-      KV<WindowedSplitResult, Integer> splitResult =
-          trySplitForProcess(
-              currentElement,
-              currentRestriction,
-              currentWindow,
-              currentWindows,
-              currentWatermarkEstimatorState,
-              fractionOfRemainder,
-              currentTracker,
-              watermarkAndState,
-              windowCurrentIndex,
-              windowStopIndex);
-      if (splitResult == null) {
-        return null;
-      }
-      windowStopIndex = splitResult.getValue();
-      windowedSplitResult =
-          calculateRestrictionSize(
-              splitResult.getKey(),
-              PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
-                  + "/GetSize");
-    }
-
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT> HandlesSplits.SplitResult constructSplitResult(
+      WindowedSplitResult windowedSplitResult,
+      HandlesSplits.SplitResult downstreamElementSplit,
+      Coder fullInputCoder,
+      Instant initialWatermark,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
+      String pTransformId,
+      String mainInputId,
+      Collection<String> outputIds,
+      Duration resumeDelay) {
+    // The element split cannot from both windowedSplitResult and downstreamElementSplit.
+    checkArgument(

Review comment:
       can both windowedSplitResult and downstreamElementSplit be null? if not should the check expressed with xor `^`

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -3858,156 +3924,225 @@ public void testScaleProgress() throws Exception {
       assertEquals(8, scaledResult.getWorkRemaining(), 0.0);
     }
 
+    @Test
+    public void testComputeSplitForProcessOrTruncateWithNullTrackerAndSplitDelegate()
+        throws Exception {
+      expected.expect(IllegalArgumentException.class);
+      FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+          currentElement,
+          currentRestriction,
+          window1,
+          ImmutableList.copyOf(currentElement.getWindows()),
+          currentWatermarkEstimatorState,
+          0.0,
+          null,
+          null,
+          null,
+          0,
+          3);
+    }
+
+    @Test
+    public void testComputeSplitForProcessOrTruncateWithNotNullTrackerAndDelegate()
+        throws Exception {
+      expected.expect(IllegalArgumentException.class);
+      FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+          currentElement,
+          currentRestriction,
+          window1,
+          ImmutableList.copyOf(currentElement.getWindows()),
+          currentWatermarkEstimatorState,
+          0.0,
+          new OffsetRangeTracker(currentRestriction),
+          createSplitDelegate(0.3, 0.0, null),
+          null,
+          0,
+          3);
+    }
+
+    @Test
+    public void testComputeSplitForProcessOrTruncateWithInvalidWatermarkAndState()
+        throws Exception {
+      expected.expect(NullPointerException.class);

Review comment:
       I'm curious why we are not throwing another exception with reason instead?

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1444,28 +1317,44 @@ public Object restriction() {
   }
 
   @VisibleForTesting
-  static <WatermarkEstimatorStateT> KV<WindowedSplitResult, Integer> trySplitForProcess(
+  static <WatermarkEstimatorStateT> SplitResultsWithStopIndex computeSplitForProcessOrTruncate(
       WindowedValue currentElement,
       Object currentRestriction,
       BoundedWindow currentWindow,
       List<BoundedWindow> windows,
       WatermarkEstimatorStateT currentWatermarkEstimatorState,
       double fractionOfRemainder,
       RestrictionTracker currentTracker,
+      HandlesSplits splitDelegate,
       KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
       int currentWindowIndex,
       int stopWindowIndex) {
+    // We should only have currentTracker or splitDelegate.
+    checkArgument(

Review comment:
       (currentTracker == null) ^ (splitDelegate == null) ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12710:
URL: https://github.com/apache/beam/pull/12710#issuecomment-685951473


   Thanks for your review! I'm going to merge the PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12710:
URL: https://github.com/apache/beam/pull/12710#issuecomment-685034420


   r: @y1chi 
   cc: @lukecwik 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz merged pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
boyuanzz merged pull request #12710:
URL: https://github.com/apache/beam/pull/12710


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12710:
URL: https://github.com/apache/beam/pull/12710#discussion_r482267362



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1546,50 +1459,32 @@ public Object restriction() {
               currentWindowIndex,
               stopWindowIndex,
               stopWindowIndex,
-              splitResult,
+              elementSplitResult,
               watermarkAndState);
     }
-    return KV.of(windowedSplitResult, newWindowStopIndex);
+    return SplitResultsWithStopIndex.of(
+        windowedSplitResult, downstreamSplitResult, newWindowStopIndex);
   }
 
-  private HandlesSplits.SplitResult trySplitForElementAndRestriction(
-      double fractionOfRemainder, Duration resumeDelay) {
-    KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
-    WindowedSplitResult windowedSplitResult = null;
-    synchronized (splitLock) {
-      // There is nothing to split if we are between element and restriction processing calls.
-      if (currentTracker == null) {
-        return null;
-      }
-      // Make sure to get the output watermark before we split to ensure that the lower bound
-      // applies to the residual.
-      watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
-      KV<WindowedSplitResult, Integer> splitResult =
-          trySplitForProcess(
-              currentElement,
-              currentRestriction,
-              currentWindow,
-              currentWindows,
-              currentWatermarkEstimatorState,
-              fractionOfRemainder,
-              currentTracker,
-              watermarkAndState,
-              windowCurrentIndex,
-              windowStopIndex);
-      if (splitResult == null) {
-        return null;
-      }
-      windowStopIndex = splitResult.getValue();
-      windowedSplitResult =
-          calculateRestrictionSize(
-              splitResult.getKey(),
-              PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
-                  + "/GetSize");
-    }
-
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT> HandlesSplits.SplitResult constructSplitResult(
+      WindowedSplitResult windowedSplitResult,
+      HandlesSplits.SplitResult downstreamElementSplit,
+      Coder fullInputCoder,
+      Instant initialWatermark,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
+      String pTransformId,
+      String mainInputId,
+      Collection<String> outputIds,
+      Duration resumeDelay) {
+    // The element split cannot from both windowedSplitResult and downstreamElementSplit.
+    checkArgument(

Review comment:
       Yeah both `windowedSplitResult.getResidualSplitRoot` and `downstreamElementSplit` can be null. The only illegal state here is both of them are not null.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12710:
URL: https://github.com/apache/beam/pull/12710#issuecomment-685924306


   Run Spotless PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12710:
URL: https://github.com/apache/beam/pull/12710#discussion_r482279137



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1444,28 +1317,44 @@ public Object restriction() {
   }
 
   @VisibleForTesting
-  static <WatermarkEstimatorStateT> KV<WindowedSplitResult, Integer> trySplitForProcess(
+  static <WatermarkEstimatorStateT> SplitResultsWithStopIndex computeSplitForProcessOrTruncate(
       WindowedValue currentElement,
       Object currentRestriction,
       BoundedWindow currentWindow,
       List<BoundedWindow> windows,
       WatermarkEstimatorStateT currentWatermarkEstimatorState,
       double fractionOfRemainder,
       RestrictionTracker currentTracker,
+      HandlesSplits splitDelegate,
       KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
       int currentWindowIndex,
       int stopWindowIndex) {
+    // We should only have currentTracker or splitDelegate.
+    checkArgument(

Review comment:
       That's a good idea. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12710:
URL: https://github.com/apache/beam/pull/12710#discussion_r482289095



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1514,6 +1409,19 @@ public Object restriction() {
                   stopWindowIndex,
                   elementSplit,
                   watermarkAndState);
+        } else if (downstreamSplitResult != null) {

Review comment:
       Yeah you are right. Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12710: [BEAM-10303] Refactor split logic to reuse common logic.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12710:
URL: https://github.com/apache/beam/pull/12710#discussion_r482270933



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -3858,156 +3924,225 @@ public void testScaleProgress() throws Exception {
       assertEquals(8, scaledResult.getWorkRemaining(), 0.0);
     }
 
+    @Test
+    public void testComputeSplitForProcessOrTruncateWithNullTrackerAndSplitDelegate()
+        throws Exception {
+      expected.expect(IllegalArgumentException.class);
+      FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+          currentElement,
+          currentRestriction,
+          window1,
+          ImmutableList.copyOf(currentElement.getWindows()),
+          currentWatermarkEstimatorState,
+          0.0,
+          null,
+          null,
+          null,
+          0,
+          3);
+    }
+
+    @Test
+    public void testComputeSplitForProcessOrTruncateWithNotNullTrackerAndDelegate()
+        throws Exception {
+      expected.expect(IllegalArgumentException.class);
+      FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+          currentElement,
+          currentRestriction,
+          window1,
+          ImmutableList.copyOf(currentElement.getWindows()),
+          currentWatermarkEstimatorState,
+          0.0,
+          new OffsetRangeTracker(currentRestriction),
+          createSplitDelegate(0.3, 0.0, null),
+          null,
+          0,
+          3);
+    }
+
+    @Test
+    public void testComputeSplitForProcessOrTruncateWithInvalidWatermarkAndState()
+        throws Exception {
+      expected.expect(NullPointerException.class);

Review comment:
       Because I use `checkNotNull` in the function body, please refer to FnApiDoFnRunner L1338.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org