You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/07/30 07:29:04 UTC

[GitHub] [beam] boyuanzz opened a new pull request #12419: [WIP] [BEAM-10303] Handle split when truncate observes windows.

boyuanzz opened a new pull request #12419:
URL: https://github.com/apache/beam/pull/12419


   **Please** add a meaningful description for your change here
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
   
    - [ ] [**Choose reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more tips on [how to make review process smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/) | ---
   Java | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build Status](htt
 ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/) | [![Build Status](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://builds.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/) | ---
   XLang | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/) | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/) | ---
   
   Pre-Commit Tests Status (on master branch)
   ------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website
   --- | --- | --- | --- | ---
   Non-portable | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/) | [![Build Status](https://ci-beam.apache.org/job/b
 eam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
   Portable | --- | [![Build Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/) | --- | ---
   
   See [.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md) for trigger phrase, status and link of all Jenkins jobs.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12419:
URL: https://github.com/apache/beam/pull/12419#issuecomment-681191509


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464697946



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {

Review comment:
       That means [trySplitForElementAndRestriction](https://github.com/apache/beam/blob/fa3ca2b11e2ca031232245814389d29c805f79e7/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java#L1039) requires the same change.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz merged pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz merged pull request #12419:
URL: https://github.com/apache/beam/pull/12419


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r476125331



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())

Review comment:
       I think we should still use `transform id` and `main input id` from `process sized elements`. There are 2 possible cases:
   
   - We only have splits on window boundary
   - We only have element split on the last window
   - We have both element splits and window splits.
   
   If we choose `transfrom id` and `input id` from `truncate` for element split, it's also wrong conceptually. I prefer `process sized elements` because it will simplify the implementation on the runner side. For example, the runner doesn't need to distinguish window splits or element splits. If we treat `truncate` and `process` as a whole, it also make sense to use `transform id` from `process`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477460982



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -133,3333 +138,4337 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.hamcrest.collection.IsMapContaining;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)

Review comment:
       Sorry for the inconvenience. Do you prefer a separate commit or a followup PR?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on pull request #12419:
URL: https://github.com/apache/beam/pull/12419#issuecomment-681203925


   Run Java PreCommit


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464686872



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedResidualRoot.getApplication().getTransformId())
+              .setInputId(windowedResidualRoot.getApplication().getInputId())
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+      // We don't want to change the output watermarks or set the checkpoint resume time since
+      // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       > We should always be using the initial watermark state for the unprocessed windows
   
   Yes, my point is we should considering set residual's watermarkHold as initial watermark. Otherwise, the runner will use `MIN_TIMESTAMP ` as default, which may hold back the watermark unnecessarily.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464700690



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {

Review comment:
       I see, let's do the scale progress PR first. I'll refactor the code in https://github.com/apache/beam/pull/12430 a little bit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477472322



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/HandlesSplits.java
##########
@@ -35,6 +36,12 @@
   /** Returns the current progress of the active element as a fraction between 0.0 and 1.0. */
   double getProgress();
 
+  String getPtranformId();

Review comment:
       I believe we should be using the transform ids from the local transform. In the window observing truncate case where we have both element splits from `process` and window splits from `truncate` then:
   * the element splits should use the transform/input id from `process`
   * the whole window splits should use the transform/input id from `truncate`
   
   We may need changes on Dataflow runner v2 to make sure it supports this.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477022288



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1237,8 +1639,10 @@ public Object restriction() {
               .setTransformId(pTransformId)
               .setInputId(mainInputId)
               .setElement(bytesOut.toByteString());
-      // We don't want to change the output watermarks or set the checkpoint resume time since
-      // that applies to the current window.
+      if (!outputWatermarkMapForUnprocessedWindows.isEmpty()) {

Review comment:
       putAllOutputWatermarks should do nothing if the input map is empty so the `if` check is extraneous.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -133,3333 +138,4337 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.hamcrest.collection.IsMapContaining;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)

Review comment:
       This is very hard to review, could we separate out the creation of the single enclosed class containing the existing tests as a separate commit.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -247,8 +247,7 @@
   /** Only valid during {@code processElement...} methods, null otherwise. */
   private WindowedValue<InputT> currentElement;
 
-  /** Only valid during {@link #processElementForSizedElementAndRestriction}. */
-  private ListIterator<BoundedWindow> currentWindowIterator;
+  private List<BoundedWindow> currentWindows;

Review comment:
       Please add a comment stating the lifetime of currentWindows

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/HandlesSplits.java
##########
@@ -35,6 +36,12 @@
   /** Returns the current progress of the active element as a fraction between 0.0 and 1.0. */
   double getProgress();
 
+  String getPtranformId();

Review comment:
       I was under the impression that we would be able to pass this information forward locally through the method without needing to expose it within HandleSplits.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1237,8 +1639,10 @@ public Object restriction() {
               .setTransformId(pTransformId)
               .setInputId(mainInputId)
               .setElement(bytesOut.toByteString());
-      // We don't want to change the output watermarks or set the checkpoint resume time since
-      // that applies to the current window.
+      if (!outputWatermarkMapForUnprocessedWindows.isEmpty()) {

Review comment:
       We should leave the comment since it still makes sense. We could update it to just state that we are using the initial watermark for the output watermarks.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
     return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return currentRestriction;
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getPrimarySplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getResidualSplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPane()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPane()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT>
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate(

Review comment:
       We should call this something else since trySplitForTruncate and trySplitForProcess should effectively be the same.
   
   Ditto for trySplitForWindowObservingTruncate and trySplitForWindowObservingProcess.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1264,17 +1668,21 @@ public Object restriction() {
             .setTransformId(pTransformId)
             .setInputId(mainInputId)
             .setElement(residualBytes.toByteString());
-
+    Map<String, org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp>
+        outputWatermarkMap = new HashMap<>();
     if (!watermarkAndState.getKey().equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
+      org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark =
+          org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
+              .setSeconds(watermarkAndState.getKey().getMillis() / 1000)
+              .setNanos((int) (watermarkAndState.getKey().getMillis() % 1000) * 1000000)
+              .build();
       for (String outputId : pTransform.getOutputsMap().keySet()) {
-        residualApplication.putOutputWatermarks(
-            outputId,
-            org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
-                .setSeconds(watermarkAndState.getKey().getMillis() / 1000)
-                .setNanos((int) (watermarkAndState.getKey().getMillis() % 1000) * 1000000)
-                .build());
+        outputWatermarkMap.put(outputId, outputWatermark);
       }
     }
+    if (!outputWatermarkMap.isEmpty()) {
+      residualApplication.putAllOutputWatermarks(outputWatermarkMap);

Review comment:
       Ditto, population the output map with an empty input map should be a no-op making the `if` check extraneous.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
     return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return currentRestriction;
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getPrimarySplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getResidualSplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPane()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPane()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT>
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate(
+          WindowedValue currentElement,
+          Object currentRestriction,
+          BoundedWindow currentWindow,
+          List<BoundedWindow> windows,
+          WatermarkEstimatorStateT currentWatermarkEstimatorState,
+          double fractionOfRemainder,
+          HandlesSplits splitDelegate,
+          int currentWindowIndex,
+          int stopWindowIndex) {
+    WindowedSplitResult windowedSplitResult = null;
+    HandlesSplits.SplitResult downstreamSplitResult = null;
+    int newWindowStopIndex = stopWindowIndex;
+    // If we are not on the last window, try to compute the split which is on the current window or
+    // on a future window.
+    if (currentWindowIndex != stopWindowIndex - 1) {
+      // Compute the fraction of the remainder relative to the scaled progress.
+      double elementCompleted = splitDelegate.getProgress();
+      Progress elementProgress = Progress.from(elementCompleted, 1 - elementCompleted);
+      Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
+      double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
+      // The fraction is out of the current window and hence we will split at the closest window
+      // boundary.
+      if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
+        newWindowStopIndex =
+            (int)
+                Math.min(
+                    stopWindowIndex - 1,
+                    currentWindowIndex
+                        + Math.max(
+                            1,
+                            Math.round(
+                                (elementProgress.getWorkCompleted() + scaledFractionOfRemainder)
+                                    / (elementProgress.getWorkCompleted()
+                                        + elementProgress.getWorkRemaining()))));
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                newWindowStopIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                null);
+
+      } else {
+        // Compute the downstream element split with the scaled fraction.
+        downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder);
+        newWindowStopIndex = currentWindowIndex + 1;
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                currentWindowIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                null);
+      }
+    } else {
+      // We are on the last window then compute the downstream element split with given fraction.
+      newWindowStopIndex = stopWindowIndex;
+      downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
+      // We cannot produce any split if the downstream is not splittable.
+      if (downstreamSplitResult == null) {
+        return null;
+      }
+      windowedSplitResult =
+          computeWindowSplitResult(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              windows,
+              currentWatermarkEstimatorState,
+              currentWindowIndex,
+              stopWindowIndex,
+              stopWindowIndex,
+              null,
+              null);
+    }
+    return KV.of(KV.of(windowedSplitResult, downstreamSplitResult), newWindowStopIndex);
+  }
+
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    WindowedSplitResult windowedSplitResult = null;
+    HandlesSplits.SplitResult downstreamSplitResult = null;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindow == null) {
+        return null;
+      }
+
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> result =
+          trySplitForTruncate(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              currentWindows,
+              currentWatermarkEstimatorState,
+              fractionOfRemainder,
+              splitDelegate,
+              windowCurrentIndex,
+              windowStopIndex);
+      if (result == null) {
+        return null;
+      }
+      windowStopIndex = result.getValue();
+      windowedSplitResult =
+          calculateRestrictionSize(
+              result.getKey().getKey(),
+              PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN + "/GetSize");
+      downstreamSplitResult = result.getKey().getValue();
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (windowedSplitResult != null
+        && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(),
+            primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(splitDelegate.getPtranformId())
+              .setInputId(splitDelegate.getMainInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (windowedSplitResult != null
+        && windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            windowedSplitResult.getResidualInUnprocessedWindowsRoot(),
+            residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      Map<String, org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp>
+          outputWatermarkMap = new HashMap<>();
+      if (!initialWatermark.equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
+        org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark =
+            org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
+                .setSeconds(initialWatermark.getMillis() / 1000)
+                .setNanos((int) (initialWatermark.getMillis() % 1000) * 1000000)
+                .build();
+        for (String outputId : splitDelegate.getOutputIds()) {
+          outputWatermarkMap.put(outputId, outputWatermark);
+        }
+      }
+
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(splitDelegate.getPtranformId())
+              .setInputId(splitDelegate.getMainInputId())
+              .putAllOutputWatermarks(outputWatermarkMap)
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+
+      residualRoots.add(
+          DelayedBundleApplication.newBuilder()
+              .setApplication(residualApplicationInUnprocessedWindows)
+              .build());
+    }
+
+    if (downstreamSplitResult != null) {
+      primaryRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getPrimaryRoots()));
+      residualRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getResidualRoots()));
+    }
+
+    return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
+  }
+
+  private static <WatermarkEstimatorStateT> WindowedSplitResult computeWindowSplitResult(
+      WindowedValue currentElement,
+      Object currentRestriction,
+      BoundedWindow currentWindow,
+      List<BoundedWindow> windows,
+      WatermarkEstimatorStateT currentWatermarkEstimatorState,
+      int toIndex,
+      int fromIndex,
+      int stopWindowIndex,
+      SplitResult<?> splitResult,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState) {
+    List<BoundedWindow> primaryFullyProcessedWindows = windows.subList(0, toIndex);
+    List<BoundedWindow> residualUnprocessedWindows = windows.subList(fromIndex, stopWindowIndex);
+    WindowedSplitResult windowedSplitResult;
+
+    windowedSplitResult =
+        WindowedSplitResult.forRoots(
+            primaryFullyProcessedWindows.isEmpty()
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    primaryFullyProcessedWindows,
+                    currentElement.getPane()),
+            splitResult == null
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    currentWindow,
+                    currentElement.getPane()),
+            splitResult == null
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(splitResult.getResidual(), watermarkAndState.getValue())),
+                    currentElement.getTimestamp(),
+                    currentWindow,
+                    currentElement.getPane()),
+            residualUnprocessedWindows.isEmpty()
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    residualUnprocessedWindows,
+                    currentElement.getPane()));
+    return windowedSplitResult;
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT> KV<WindowedSplitResult, Integer> trySplitForProcess(
+      WindowedValue currentElement,
+      Object currentRestriction,
+      BoundedWindow currentWindow,
+      List<BoundedWindow> windows,
+      WatermarkEstimatorStateT currentWatermarkEstimatorState,
+      double fractionOfRemainder,
+      RestrictionTracker currentTracker,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
+      int currentWindowIndex,
+      int stopWindowIndex) {
+    WindowedSplitResult windowedSplitResult = null;
+    int newWindowStopIndex = stopWindowIndex;
+    // If we are not on the last window, try to compute the split which is on the current window or
+    // on a future window.
+    if (currentWindowIndex != stopWindowIndex - 1) {
+      // Compute the fraction of the remainder relative to the scaled progress.
+      Progress elementProgress;
+      if (currentTracker instanceof HasProgress) {
+        elementProgress = ((HasProgress) currentTracker).getProgress();
+      } else {
+        elementProgress = Progress.from(0, 1);
+      }
+      Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
+      double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
+
+      // The fraction is out of the current window and hence we will split at the closest window
+      // boundary.
+      if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
+        newWindowStopIndex =
+            (int)
+                Math.min(
+                    stopWindowIndex - 1,
+                    currentWindowIndex
+                        + Math.max(
+                            1,
+                            Math.round(
+                                (elementProgress.getWorkCompleted() + scaledFractionOfRemainder)
+                                    / (elementProgress.getWorkCompleted()
+                                        + elementProgress.getWorkRemaining()))));
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                newWindowStopIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                watermarkAndState);
+      } else {
+        // Compute the element split with the scaled fraction.
+        SplitResult<?> elementSplit =
+            currentTracker.trySplit(scaledFractionOfRemainder / elementProgress.getWorkRemaining());
+        newWindowStopIndex = currentWindowIndex + 1;
+        if (elementSplit != null) {
+          windowedSplitResult =
+              computeWindowSplitResult(
+                  currentElement,
+                  currentRestriction,
+                  currentWindow,
+                  windows,
+                  currentWatermarkEstimatorState,
+                  currentWindowIndex,
+                  newWindowStopIndex,
+                  stopWindowIndex,
+                  elementSplit,
+                  watermarkAndState);
+        } else {
+          windowedSplitResult =
+              computeWindowSplitResult(
+                  currentElement,
+                  currentRestriction,
+                  currentWindow,
+                  windows,
+                  currentWatermarkEstimatorState,
+                  newWindowStopIndex,
+                  newWindowStopIndex,
+                  stopWindowIndex,
+                  null,
+                  watermarkAndState);
+        }
+      }
+    } else {
+      // We are on the last window then compute the element split with given fraction.
+      newWindowStopIndex = stopWindowIndex;
+      SplitResult<?> splitResult = currentTracker.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+      windowedSplitResult =
+          computeWindowSplitResult(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              windows,
+              currentWatermarkEstimatorState,
+              currentWindowIndex,
+              stopWindowIndex,
+              stopWindowIndex,
+              splitResult,
+              watermarkAndState);
+    }
+    return KV.of(windowedSplitResult, newWindowStopIndex);
+  }
+
   private HandlesSplits.SplitResult trySplitForElementAndRestriction(
       double fractionOfRemainder, Duration resumeDelay) {
     KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
-    WindowedSplitResult windowedSplitResult;
+    WindowedSplitResult windowedSplitResult = null;
     synchronized (splitLock) {
       // There is nothing to split if we are between element and restriction processing calls.
       if (currentTracker == null) {
         return null;
       }
-
       // Make sure to get the output watermark before we split to ensure that the lower bound
       // applies to the residual.
       watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
-      SplitResult<RestrictionT> splitResult = currentTracker.trySplit(fractionOfRemainder);
+      KV<WindowedSplitResult, Integer> splitResult =

Review comment:
       You might want to make use of an `@AutoValue` for the return type to make it clear the updated stop index is being returned.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r476125331



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())

Review comment:
       I think we should still use `transform id` and `main input id` from `process sized elements`. There are 3 possible cases:
   
   - We only have splits on window boundary
   - We only have element split on the last window
   - We have both element splits and window splits.
   
   If we choose `transfrom id` and `input id` from `truncate` for element split, it's also wrong conceptually. I prefer `process sized elements` because it will simplify the implementation on the runner side. For example, the runner doesn't need to distinguish window splits or element splits. If we treat `truncate` and `process` as a whole, it also make sense to use `transform id` from `process`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477470639



##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -133,3333 +138,4337 @@
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
 import org.hamcrest.collection.IsMapContaining;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
+import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.runners.Enclosed;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /** Tests for {@link FnApiDoFnRunner}. */
-@RunWith(JUnit4.class)
+@RunWith(Enclosed.class)

Review comment:
       I did the review, we can disregard this comment.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464651698



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -515,15 +515,18 @@
               && Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
+                  private final HandlesSplits splitDelegate =
+                      (HandlesSplits) Iterables.get(mainOutputConsumers, 0);

Review comment:
       nit: here and below on 550
   ```suggestion
                         (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedResidualRoot.getApplication().getTransformId())

Review comment:
       ditto, we should be using the `truncate` transfrom id and truncate `input` id.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())

Review comment:
       we should be using the `truncate` transform id and not the `process sized elements` transform id
   
   This will remove the assumption about what the input coder is and we can append our additional primaries and residuals on top of any additional residuals/primaries added by the downstream split.
   
   Doing this might require fixing Dataflow runner v2.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedResidualRoot.getApplication().getTransformId())
+              .setInputId(windowedResidualRoot.getApplication().getInputId())
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+      // We don't want to change the output watermarks or set the checkpoint resume time since
+      // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       We should always be using the initial watermark state for the unprocessed windows and not reporting any watermark for these residual roots since it should be the same as it was.
   ```suggestion
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1193,6 +1328,7 @@ public Object restriction() {
               .setElement(bytesOut.toByteString());
       // We don't want to change the output watermarks or set the checkpoint resume time since
       // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       Same as above.
   ```suggestion
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {

Review comment:
       We should be scaling the fraction of the remainder relative to the number of windows we have to find the best split point.
   
   This logic should essentially mirror https://github.com/apache/beam/blob/13c77a8f7a7c726aaa6dadad7812acdf8772ba7c/sdks/python/apache_beam/runners/common.py#L892 except that we will be using the `truncate` transform id, input id, and coder.
   
   I suggest following the approach there where we have a static method that does all the heavy lifting so we can test it well and we have some simple wrappers which pass forward all the necessary arguments to the static method that is visible for testing.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -1452,11 +1452,22 @@ public Instant getInitialWatermarkEstimatorState() {
   static class WindowObservingTestSplittableDoFn extends NonWindowObservingTestSplittableDoFn {
 
     private final PCollectionView<String> singletonSideInput;
+    private static final long PROCESSED_WINDOW = 1;
+    private boolean splitAtTruncate = false;
+    private long processedWindowCount = 0;

Review comment:
       ```suggestion
       private final boolean splitAtTruncate;
       private long processedWindowCount;
   ```

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -1452,11 +1452,22 @@ public Instant getInitialWatermarkEstimatorState() {
   static class WindowObservingTestSplittableDoFn extends NonWindowObservingTestSplittableDoFn {
 
     private final PCollectionView<String> singletonSideInput;
+    private static final long PROCESSED_WINDOW = 1;
+    private boolean splitAtTruncate = false;
+    private long processedWindowCount = 0;
 
     private WindowObservingTestSplittableDoFn(PCollectionView<String> singletonSideInput) {
       this.singletonSideInput = singletonSideInput;
     }
 
+    private static WindowObservingTestSplittableDoFn forSplitAtTruncate(
+        PCollectionView<String> singletonSideInput) {
+      WindowObservingTestSplittableDoFn doFn =
+          new WindowObservingTestSplittableDoFn(singletonSideInput);

Review comment:
       nit: update the constructor to take this parameter instead of mutating it on the instance.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -3013,19 +3069,107 @@ public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObser
             startFunctionRegistry,
             finishFunctionRegistry,
             teardownFunctions::add,
-            null /* addProgressRequestCallback */,
-            null /* bundleSplitListener */,
+            progressRequestCallbacks::add /* addProgressRequestCallback */,
+            splitListener /* bundleSplitListener */,
             null /* bundleFinalizer */);
+
+    assertThat(consumers.keySet(), containsInAnyOrder(inputPCollectionId, outputPCollectionId));
     FnDataReceiver<WindowedValue<?>> mainInput =
         consumers.getMultiplexingConsumer(inputPCollectionId);
     assertThat(mainInput, instanceOf(HandlesSplits.class));
 
-    assertEquals(0, ((HandlesSplits) mainInput).getProgress(), 0.0);
-    assertNull(((HandlesSplits) mainInput).trySplit(0.4));
+    mainOutputValues.clear();
+    BoundedWindow window1 = new IntervalWindow(new Instant(5), new Instant(10));
+    BoundedWindow window2 = new IntervalWindow(new Instant(6), new Instant(11));
+    BoundedWindow window3 = new IntervalWindow(new Instant(7), new Instant(12));
+    // Setup and launch the trySplit thread.
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    Future<HandlesSplits.SplitResult> trySplitFuture =
+        executorService.submit(
+            () -> {
+              try {
+                doFn.waitForSplitElementToBeProcessed();
+
+                return ((HandlesSplits) mainInput).trySplit(0);
+              } finally {
+                doFn.releaseWaitingProcessElementThread();
+              }
+            });
+
+    WindowedValue<?> splitValue =
+        valueInWindows(
+            KV.of(KV.of("7", KV.of(new OffsetRange(0, 6), GlobalWindow.TIMESTAMP_MIN_VALUE)), 6.0),
+            window1,
+            window2,
+            window3);
+    mainInput.accept(splitValue);
+    HandlesSplits.SplitResult trySplitResult = trySplitFuture.get();
+
+    // We expect that there are outputs from window1 and window2
+    assertThat(
+        mainOutputValues,
+        contains(
+            WindowedValue.of(
+                KV.of(
+                    KV.of("7", KV.of(new OffsetRange(0, 3), GlobalWindow.TIMESTAMP_MIN_VALUE)),
+                    3.0),
+                splitValue.getTimestamp(),
+                window1,
+                splitValue.getPane()),
+            WindowedValue.of(
+                KV.of(
+                    KV.of("7", KV.of(new OffsetRange(0, 3), GlobalWindow.TIMESTAMP_MIN_VALUE)),
+                    3.0),
+                splitValue.getTimestamp(),
+                window2,
+                splitValue.getPane())));
+
+    SplitResult expectedElementSplit = createSplitResult(0);

Review comment:
       We'll want to cover more scenarios then just this, see the ones I added in https://github.com/apache/beam/commit/ac6d80ed0cd31ec2b5233d6b3805ffb3ad71a0f1#diff-1b0bb2e59974e0a12a61a762a9add92a
   
   This would likely require refactoring the trySplit method to be a static method where all the parameters are passed to the method so that we don't have to worry about all the additional setup and we'll just be testing the core of the method. We can keep one variant around using this approach which well help ensure that we are using the correct locks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477462448



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/HandlesSplits.java
##########
@@ -35,6 +36,12 @@
   /** Returns the current progress of the active element as a fraction between 0.0 and 1.0. */
   double getProgress();
 
+  String getPtranformId();

Review comment:
       Yes, we can pass these information locally for current transform. But I also need these information from `process transform` when current transform is `truncate`.  Whether this change is needed depends on the decision on which transform id and main input id we should use for split from `truncate`. As I mentioned in previous comment:
   I think we should still use transform id and main input id from process sized elements. There are 3 possible cases:
   
   - We only have splits on window boundary
   
   - We only have element split on the last window
   
   - We have both element splits and window splits.
   
   If we choose transfrom id and input id from truncate for element split, it's also wrong conceptually. I prefer process sized elements because it will simplify the implementation on the runner side. For example, the runner doesn't need to distinguish window splits or element splits. If we treat truncate and process as a whole, it also make sense to use transform id from process.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477554671



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
     return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return currentRestriction;
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getPrimarySplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getResidualSplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPane()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPane()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT>
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate(
+          WindowedValue currentElement,
+          Object currentRestriction,
+          BoundedWindow currentWindow,
+          List<BoundedWindow> windows,
+          WatermarkEstimatorStateT currentWatermarkEstimatorState,
+          double fractionOfRemainder,
+          HandlesSplits splitDelegate,
+          int currentWindowIndex,
+          int stopWindowIndex) {
+    WindowedSplitResult windowedSplitResult = null;
+    HandlesSplits.SplitResult downstreamSplitResult = null;
+    int newWindowStopIndex = stopWindowIndex;
+    // If we are not on the last window, try to compute the split which is on the current window or
+    // on a future window.
+    if (currentWindowIndex != stopWindowIndex - 1) {
+      // Compute the fraction of the remainder relative to the scaled progress.
+      double elementCompleted = splitDelegate.getProgress();
+      Progress elementProgress = Progress.from(elementCompleted, 1 - elementCompleted);
+      Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
+      double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
+      // The fraction is out of the current window and hence we will split at the closest window
+      // boundary.
+      if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
+        newWindowStopIndex =
+            (int)
+                Math.min(
+                    stopWindowIndex - 1,
+                    currentWindowIndex
+                        + Math.max(
+                            1,
+                            Math.round(
+                                (elementProgress.getWorkCompleted() + scaledFractionOfRemainder)
+                                    / (elementProgress.getWorkCompleted()
+                                        + elementProgress.getWorkRemaining()))));
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                newWindowStopIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                null);
+
+      } else {
+        // Compute the downstream element split with the scaled fraction.
+        downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder);
+        newWindowStopIndex = currentWindowIndex + 1;
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                currentWindowIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                null);
+      }
+    } else {
+      // We are on the last window then compute the downstream element split with given fraction.
+      newWindowStopIndex = stopWindowIndex;
+      downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
+      // We cannot produce any split if the downstream is not splittable.
+      if (downstreamSplitResult == null) {
+        return null;
+      }
+      windowedSplitResult =
+          computeWindowSplitResult(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              windows,
+              currentWatermarkEstimatorState,
+              currentWindowIndex,
+              stopWindowIndex,
+              stopWindowIndex,
+              null,
+              null);
+    }
+    return KV.of(KV.of(windowedSplitResult, downstreamSplitResult), newWindowStopIndex);
+  }
+
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    WindowedSplitResult windowedSplitResult = null;
+    HandlesSplits.SplitResult downstreamSplitResult = null;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindow == null) {
+        return null;
+      }
+
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> result =
+          trySplitForTruncate(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              currentWindows,
+              currentWatermarkEstimatorState,
+              fractionOfRemainder,
+              splitDelegate,
+              windowCurrentIndex,
+              windowStopIndex);
+      if (result == null) {
+        return null;
+      }
+      windowStopIndex = result.getValue();
+      windowedSplitResult =
+          calculateRestrictionSize(
+              result.getKey().getKey(),
+              PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN + "/GetSize");
+      downstreamSplitResult = result.getKey().getValue();
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (windowedSplitResult != null
+        && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(),
+            primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(splitDelegate.getPtranformId())
+              .setInputId(splitDelegate.getMainInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (windowedSplitResult != null
+        && windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            windowedSplitResult.getResidualInUnprocessedWindowsRoot(),
+            residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      Map<String, org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp>
+          outputWatermarkMap = new HashMap<>();
+      if (!initialWatermark.equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
+        org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark =
+            org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
+                .setSeconds(initialWatermark.getMillis() / 1000)
+                .setNanos((int) (initialWatermark.getMillis() % 1000) * 1000000)
+                .build();
+        for (String outputId : splitDelegate.getOutputIds()) {
+          outputWatermarkMap.put(outputId, outputWatermark);
+        }
+      }
+
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(splitDelegate.getPtranformId())
+              .setInputId(splitDelegate.getMainInputId())
+              .putAllOutputWatermarks(outputWatermarkMap)
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+
+      residualRoots.add(
+          DelayedBundleApplication.newBuilder()
+              .setApplication(residualApplicationInUnprocessedWindows)
+              .build());
+    }
+
+    if (downstreamSplitResult != null) {
+      primaryRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getPrimaryRoots()));
+      residualRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getResidualRoots()));
+    }
+
+    return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
+  }
+
+  private static <WatermarkEstimatorStateT> WindowedSplitResult computeWindowSplitResult(
+      WindowedValue currentElement,
+      Object currentRestriction,
+      BoundedWindow currentWindow,
+      List<BoundedWindow> windows,
+      WatermarkEstimatorStateT currentWatermarkEstimatorState,
+      int toIndex,
+      int fromIndex,
+      int stopWindowIndex,
+      SplitResult<?> splitResult,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState) {
+    List<BoundedWindow> primaryFullyProcessedWindows = windows.subList(0, toIndex);
+    List<BoundedWindow> residualUnprocessedWindows = windows.subList(fromIndex, stopWindowIndex);
+    WindowedSplitResult windowedSplitResult;
+
+    windowedSplitResult =
+        WindowedSplitResult.forRoots(
+            primaryFullyProcessedWindows.isEmpty()
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    primaryFullyProcessedWindows,
+                    currentElement.getPane()),
+            splitResult == null
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(splitResult.getPrimary(), currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    currentWindow,
+                    currentElement.getPane()),
+            splitResult == null
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(splitResult.getResidual(), watermarkAndState.getValue())),
+                    currentElement.getTimestamp(),
+                    currentWindow,
+                    currentElement.getPane()),
+            residualUnprocessedWindows.isEmpty()
+                ? null
+                : WindowedValue.of(
+                    KV.of(
+                        currentElement.getValue(),
+                        KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                    currentElement.getTimestamp(),
+                    residualUnprocessedWindows,
+                    currentElement.getPane()));
+    return windowedSplitResult;
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT> KV<WindowedSplitResult, Integer> trySplitForProcess(
+      WindowedValue currentElement,
+      Object currentRestriction,
+      BoundedWindow currentWindow,
+      List<BoundedWindow> windows,
+      WatermarkEstimatorStateT currentWatermarkEstimatorState,
+      double fractionOfRemainder,
+      RestrictionTracker currentTracker,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
+      int currentWindowIndex,
+      int stopWindowIndex) {
+    WindowedSplitResult windowedSplitResult = null;
+    int newWindowStopIndex = stopWindowIndex;
+    // If we are not on the last window, try to compute the split which is on the current window or
+    // on a future window.
+    if (currentWindowIndex != stopWindowIndex - 1) {
+      // Compute the fraction of the remainder relative to the scaled progress.
+      Progress elementProgress;
+      if (currentTracker instanceof HasProgress) {
+        elementProgress = ((HasProgress) currentTracker).getProgress();
+      } else {
+        elementProgress = Progress.from(0, 1);
+      }
+      Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
+      double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
+
+      // The fraction is out of the current window and hence we will split at the closest window
+      // boundary.
+      if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
+        newWindowStopIndex =
+            (int)
+                Math.min(
+                    stopWindowIndex - 1,
+                    currentWindowIndex
+                        + Math.max(
+                            1,
+                            Math.round(
+                                (elementProgress.getWorkCompleted() + scaledFractionOfRemainder)
+                                    / (elementProgress.getWorkCompleted()
+                                        + elementProgress.getWorkRemaining()))));
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                newWindowStopIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                null,
+                watermarkAndState);
+      } else {
+        // Compute the element split with the scaled fraction.
+        SplitResult<?> elementSplit =
+            currentTracker.trySplit(scaledFractionOfRemainder / elementProgress.getWorkRemaining());
+        newWindowStopIndex = currentWindowIndex + 1;
+        if (elementSplit != null) {
+          windowedSplitResult =
+              computeWindowSplitResult(
+                  currentElement,
+                  currentRestriction,
+                  currentWindow,
+                  windows,
+                  currentWatermarkEstimatorState,
+                  currentWindowIndex,
+                  newWindowStopIndex,
+                  stopWindowIndex,
+                  elementSplit,
+                  watermarkAndState);
+        } else {
+          windowedSplitResult =
+              computeWindowSplitResult(
+                  currentElement,
+                  currentRestriction,
+                  currentWindow,
+                  windows,
+                  currentWatermarkEstimatorState,
+                  newWindowStopIndex,
+                  newWindowStopIndex,
+                  stopWindowIndex,
+                  null,
+                  watermarkAndState);
+        }
+      }
+    } else {
+      // We are on the last window then compute the element split with given fraction.
+      newWindowStopIndex = stopWindowIndex;
+      SplitResult<?> splitResult = currentTracker.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+      windowedSplitResult =
+          computeWindowSplitResult(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              windows,
+              currentWatermarkEstimatorState,
+              currentWindowIndex,
+              stopWindowIndex,
+              stopWindowIndex,
+              splitResult,
+              watermarkAndState);
+    }
+    return KV.of(windowedSplitResult, newWindowStopIndex);
+  }
+
   private HandlesSplits.SplitResult trySplitForElementAndRestriction(
       double fractionOfRemainder, Duration resumeDelay) {
     KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
-    WindowedSplitResult windowedSplitResult;
+    WindowedSplitResult windowedSplitResult = null;
     synchronized (splitLock) {
       // There is nothing to split if we are between element and restriction processing calls.
       if (currentTracker == null) {
         return null;
       }
-
       // Make sure to get the output watermark before we split to ensure that the lower bound
       // applies to the residual.
       watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
-      SplitResult<RestrictionT> splitResult = currentTracker.trySplit(fractionOfRemainder);
+      KV<WindowedSplitResult, Integer> splitResult =

Review comment:
       Let's leave it as it is and I'll update it together with clean-up?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464699501



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {

Review comment:
       Yeah, that would make sense and would allow us to share a bunch of code between the truncate split logic and the process sized elements split logic.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477498287



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
     return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return currentRestriction;
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getPrimarySplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getResidualSplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPane()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPane()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT>
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate(

Review comment:
       If we decided to unify `trySplitForTruncate ` and `trySplitForProcess `, then it's natural to unify `trySplitForWindowObservingTruncate` and `trySplitForWindowObservingProcess `. 
   I was trying to unify `trySplitForTruncate ` and `trySplitForProcess ` and here were concerns that prevented me doing so:
   
   - In order to compute scaled fractionOfRemainer produce element split, `trySplitForProcess` needs `currentRestrictionTracker`, where `trySplitForTruncate` needs `splitDelegate`.  If unifying these 2, we need both `RestrictionTracker` and `HandleSplits` as part of parameter list, which is already very long. In the function body, we also need separate branches to get element progress and splits from one of `currentRestrictionTracker` and `splitDelegate`.
   
   - `trySplitForProcess` returns `<WindowedSplitResult, Integer>` where `trySplitForTruncate` returns `<<WindowedSplitResult, HandlesSplits.SplitResult>, Integer>`. If unifying, we need to return `<WindowedSplitResult,  HandlesSplits.SplitResult, Integer>`, then in the caller function body, we still need separate branches to construct primary and residual.
   
   I'm OK to unify them but the way we can do only works for splitting at `process` and `truncate`. I don't think it's extendable to other splitting in the future if any. Also I feel like it makes the logic harder to understand, given that it's already complicated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464699645



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1193,6 +1328,7 @@ public Object restriction() {
               .setElement(bytesOut.toByteString());
       // We don't want to change the output watermarks or set the checkpoint resume time since
       // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       resolved since this suggestion was not optimal.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1193,6 +1328,7 @@ public Object restriction() {
               .setElement(bytesOut.toByteString());
       // We don't want to change the output watermarks or set the checkpoint resume time since
       // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       resolved since this suggestion was not right.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477506877



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1080,136 +1121,484 @@ private static Progress scaleProgress(
     return Progress.from(completed, remaining);
   }
 
+  private WindowedSplitResult calculateRestrictionSize(
+      WindowedSplitResult splitResult, String errorContext) {
+    double fullSize =
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+                && splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return currentRestriction;
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double primarySize =
+        splitResult.getPrimarySplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getPrimarySplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    double residualSize =
+        splitResult.getResidualSplitRoot() == null
+            ? 0
+            : doFnInvoker.invokeGetSize(
+                new DelegatingArgumentProvider<InputT, OutputT>(processContext, errorContext) {
+                  @Override
+                  public Object restriction() {
+                    return ((KV<?, KV<?, ?>>) splitResult.getResidualSplitRoot().getValue())
+                        .getValue()
+                        .getKey();
+                  }
+
+                  @Override
+                  public RestrictionTracker<?, ?> restrictionTracker() {
+                    return doFnInvoker.invokeNewTracker(this);
+                  }
+                });
+    return WindowedSplitResult.forRoots(
+        splitResult.getPrimaryInFullyProcessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimaryInFullyProcessedWindowsRoot().getValue(), fullSize),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getTimestamp(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getWindows(),
+                splitResult.getPrimaryInFullyProcessedWindowsRoot().getPane()),
+        splitResult.getPrimarySplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getPrimarySplitRoot().getValue(), primarySize),
+                splitResult.getPrimarySplitRoot().getTimestamp(),
+                splitResult.getPrimarySplitRoot().getWindows(),
+                splitResult.getPrimarySplitRoot().getPane()),
+        splitResult.getResidualSplitRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualSplitRoot().getValue(), residualSize),
+                splitResult.getResidualSplitRoot().getTimestamp(),
+                splitResult.getResidualSplitRoot().getWindows(),
+                splitResult.getResidualSplitRoot().getPane()),
+        splitResult.getResidualInUnprocessedWindowsRoot() == null
+            ? null
+            : WindowedValue.of(
+                KV.of(splitResult.getResidualInUnprocessedWindowsRoot().getValue(), fullSize),
+                splitResult.getResidualInUnprocessedWindowsRoot().getTimestamp(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getWindows(),
+                splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
+  }
+
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT>
+      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate(

Review comment:
       I think since we have all the tests we can merge it as is and do a follow-up exploring clean-up options.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464699157



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedResidualRoot.getApplication().getTransformId())
+              .setInputId(windowedResidualRoot.getApplication().getInputId())
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+      // We don't want to change the output watermarks or set the checkpoint resume time since
+      // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       That is a good point.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [beam] boyuanzz commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

Posted by GitBox <gi...@apache.org>.
boyuanzz commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r477618562



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/HandlesSplits.java
##########
@@ -35,6 +36,12 @@
   /** Returns the current progress of the active element as a fraction between 0.0 and 1.0. */
   double getProgress();
 
+  String getPtranformId();

Review comment:
       Discussed offline. We decided that window splits are populated with truncate transform id and element splits are populated with process transform id.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org