You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/09/02 19:32:56 UTC

[beam] branch master updated: Refactor split logic to reuse common logic.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 02a6da9  Refactor split logic to reuse common logic.
     new b0abce2  Merge pull request #12710 from boyuanzz/split_refactor
02a6da9 is described below

commit 02a6da9ab3d59e6cba2530153e8ca5e7598d1174
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Aug 27 16:36:55 2020 -0700

    Refactor split logic to reuse common logic.
---
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 410 +++++-------
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       | 712 +++++++++++++++++----
 2 files changed, 758 insertions(+), 364 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 9c7ee2e..f77d8d6 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -984,6 +984,23 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
     public abstract @Nullable WindowedValue getResidualInUnprocessedWindowsRoot();
   }
 
+  @AutoValue
+  abstract static class SplitResultsWithStopIndex {
+    public static SplitResultsWithStopIndex of(
+        WindowedSplitResult windowSplit,
+        HandlesSplits.SplitResult downstreamSplit,
+        int newWindowStopIndex) {
+      return new AutoValue_FnApiDoFnRunner_SplitResultsWithStopIndex(
+          windowSplit, downstreamSplit, newWindowStopIndex);
+    }
+
+    public abstract @Nullable WindowedSplitResult getWindowSplit();
+
+    public abstract HandlesSplits.@Nullable SplitResult getDownstreamSplit();
+
+    public abstract int getNewWindowStopIndex();
+  }
+
   private void processElementForWindowObservingSizedElementAndRestriction(
       WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
     currentElement = elem.withValue(elem.getValue().getKey().getKey());
@@ -1195,102 +1212,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                 splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
   }
 
-  @VisibleForTesting
-  static <WatermarkEstimatorStateT>
-      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate(
-          WindowedValue currentElement,
-          Object currentRestriction,
-          BoundedWindow currentWindow,
-          List<BoundedWindow> windows,
-          WatermarkEstimatorStateT currentWatermarkEstimatorState,
-          double fractionOfRemainder,
-          HandlesSplits splitDelegate,
-          int currentWindowIndex,
-          int stopWindowIndex) {
-    WindowedSplitResult windowedSplitResult = null;
-    HandlesSplits.SplitResult downstreamSplitResult = null;
-    int newWindowStopIndex = stopWindowIndex;
-    // If we are not on the last window, try to compute the split which is on the current window or
-    // on a future window.
-    if (currentWindowIndex != stopWindowIndex - 1) {
-      // Compute the fraction of the remainder relative to the scaled progress.
-      double elementCompleted = splitDelegate.getProgress();
-      Progress elementProgress = Progress.from(elementCompleted, 1 - elementCompleted);
-      Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
-      double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
-      // The fraction is out of the current window and hence we will split at the closest window
-      // boundary.
-      if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
-        newWindowStopIndex =
-            (int)
-                Math.min(
-                    stopWindowIndex - 1,
-                    currentWindowIndex
-                        + Math.max(
-                            1,
-                            Math.round(
-                                (elementProgress.getWorkCompleted() + scaledFractionOfRemainder)
-                                    / (elementProgress.getWorkCompleted()
-                                        + elementProgress.getWorkRemaining()))));
-        windowedSplitResult =
-            computeWindowSplitResult(
-                currentElement,
-                currentRestriction,
-                currentWindow,
-                windows,
-                currentWatermarkEstimatorState,
-                newWindowStopIndex,
-                newWindowStopIndex,
-                stopWindowIndex,
-                null,
-                null);
-
-      } else {
-        // Compute the downstream element split with the scaled fraction.
-        downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder);
-        newWindowStopIndex = currentWindowIndex + 1;
-        windowedSplitResult =
-            computeWindowSplitResult(
-                currentElement,
-                currentRestriction,
-                currentWindow,
-                windows,
-                currentWatermarkEstimatorState,
-                currentWindowIndex,
-                newWindowStopIndex,
-                stopWindowIndex,
-                null,
-                null);
-      }
-    } else {
-      // We are on the last window then compute the downstream element split with given fraction.
-      newWindowStopIndex = stopWindowIndex;
-      downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
-      // We cannot produce any split if the downstream is not splittable.
-      if (downstreamSplitResult == null) {
-        return null;
-      }
-      windowedSplitResult =
-          computeWindowSplitResult(
-              currentElement,
-              currentRestriction,
-              currentWindow,
-              windows,
-              currentWatermarkEstimatorState,
-              currentWindowIndex,
-              stopWindowIndex,
-              stopWindowIndex,
-              null,
-              null);
-    }
-    return KV.of(KV.of(windowedSplitResult, downstreamSplitResult), newWindowStopIndex);
-  }
-
   private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
       double fractionOfRemainder, HandlesSplits splitDelegate) {
-    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
-    // the same as the SDF/Process transform.
-    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
     WindowedSplitResult windowedSplitResult = null;
     HandlesSplits.SplitResult downstreamSplitResult = null;
     synchronized (splitLock) {
@@ -1299,92 +1222,42 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
         return null;
       }
 
-      KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> result =
-          trySplitForTruncate(
+      SplitResultsWithStopIndex splitResult =
+          computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               currentWindow,
               currentWindows,
               currentWatermarkEstimatorState,
               fractionOfRemainder,
+              null,
               splitDelegate,
+              null,
               windowCurrentIndex,
               windowStopIndex);
-      if (result == null) {
+      if (splitResult == null) {
         return null;
       }
-      windowStopIndex = result.getValue();
+      windowStopIndex = splitResult.getNewWindowStopIndex();
       windowedSplitResult =
           calculateRestrictionSize(
-              result.getKey().getKey(),
+              splitResult.getWindowSplit(),
               PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN + "/GetSize");
-      downstreamSplitResult = result.getKey().getValue();
-    }
-
-    List<BundleApplication> primaryRoots = new ArrayList<>();
-    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
-
-    if (windowedSplitResult != null
-        && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
-      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
-      try {
-        fullInputCoder.encode(
-            windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(),
-            primaryInOtherWindowsBytes);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      BundleApplication.Builder primaryApplicationInOtherWindows =
-          BundleApplication.newBuilder()
-              .setTransformId(pTransformId)
-              .setInputId(mainInputId)
-              .setElement(primaryInOtherWindowsBytes.toByteString());
-      primaryRoots.add(primaryApplicationInOtherWindows.build());
-    }
-    if (windowedSplitResult != null
-        && windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
-      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
-      try {
-        fullInputCoder.encode(
-            windowedSplitResult.getResidualInUnprocessedWindowsRoot(),
-            residualInUnprocessedWindowsBytesOut);
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-      // We don't want to change the output watermarks or set the checkpoint resume time since
-      // that applies to the current window.
-      Map<String, org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp>
-          outputWatermarkMap = new HashMap<>();
-      if (!initialWatermark.equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
-        org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark =
-            org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
-                .setSeconds(initialWatermark.getMillis() / 1000)
-                .setNanos((int) (initialWatermark.getMillis() % 1000) * 1000000)
-                .build();
-        for (String outputId : pTransform.getOutputsMap().keySet()) {
-          outputWatermarkMap.put(outputId, outputWatermark);
-        }
-      }
-
-      BundleApplication.Builder residualApplicationInUnprocessedWindows =
-          BundleApplication.newBuilder()
-              .setTransformId(pTransformId)
-              .setInputId(mainInputId)
-              .putAllOutputWatermarks(outputWatermarkMap)
-              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
-
-      residualRoots.add(
-          DelayedBundleApplication.newBuilder()
-              .setApplication(residualApplicationInUnprocessedWindows)
-              .build());
+      downstreamSplitResult = splitResult.getDownstreamSplit();
     }
-
-    if (downstreamSplitResult != null) {
-      primaryRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getPrimaryRoots()));
-      residualRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getResidualRoots()));
-    }
-
-    return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    return constructSplitResult(
+        windowedSplitResult,
+        downstreamSplitResult,
+        fullInputCoder,
+        initialWatermark,
+        null,
+        pTransformId,
+        mainInputId,
+        pTransform.getOutputsMap().keySet(),
+        null);
   }
 
   private static <WatermarkEstimatorStateT> WindowedSplitResult computeWindowSplitResult(
@@ -1444,7 +1317,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   }
 
   @VisibleForTesting
-  static <WatermarkEstimatorStateT> KV<WindowedSplitResult, Integer> trySplitForProcess(
+  static <WatermarkEstimatorStateT> SplitResultsWithStopIndex computeSplitForProcessOrTruncate(
       WindowedValue currentElement,
       Object currentRestriction,
       BoundedWindow currentWindow,
@@ -1452,20 +1325,34 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
       WatermarkEstimatorStateT currentWatermarkEstimatorState,
       double fractionOfRemainder,
       RestrictionTracker currentTracker,
+      HandlesSplits splitDelegate,
       KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
       int currentWindowIndex,
       int stopWindowIndex) {
+    // We should only have currentTracker or splitDelegate.
+    checkArgument((currentTracker != null) ^ (splitDelegate != null));
+    // When we have currentTracker, the watermarkAndState should not be null.
+    if (currentTracker != null) {
+      checkNotNull(watermarkAndState);
+    }
+
     WindowedSplitResult windowedSplitResult = null;
+    HandlesSplits.SplitResult downstreamSplitResult = null;
     int newWindowStopIndex = stopWindowIndex;
     // If we are not on the last window, try to compute the split which is on the current window or
     // on a future window.
     if (currentWindowIndex != stopWindowIndex - 1) {
       // Compute the fraction of the remainder relative to the scaled progress.
       Progress elementProgress;
-      if (currentTracker instanceof HasProgress) {
-        elementProgress = ((HasProgress) currentTracker).getProgress();
+      if (currentTracker != null) {
+        if (currentTracker instanceof HasProgress) {
+          elementProgress = ((HasProgress) currentTracker).getProgress();
+        } else {
+          elementProgress = Progress.from(0, 1);
+        }
       } else {
-        elementProgress = Progress.from(0, 1);
+        double elementCompleted = splitDelegate.getProgress();
+        elementProgress = Progress.from(elementCompleted, 1 - elementCompleted);
       }
       Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
       double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
@@ -1498,42 +1385,42 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                 watermarkAndState);
       } else {
         // Compute the element split with the scaled fraction.
-        SplitResult<?> elementSplit =
-            currentTracker.trySplit(scaledFractionOfRemainder / elementProgress.getWorkRemaining());
-        newWindowStopIndex = currentWindowIndex + 1;
-        if (elementSplit != null) {
-          windowedSplitResult =
-              computeWindowSplitResult(
-                  currentElement,
-                  currentRestriction,
-                  currentWindow,
-                  windows,
-                  currentWatermarkEstimatorState,
-                  currentWindowIndex,
-                  newWindowStopIndex,
-                  stopWindowIndex,
-                  elementSplit,
-                  watermarkAndState);
+        SplitResult<?> elementSplit = null;
+        if (currentTracker != null) {
+          elementSplit =
+              currentTracker.trySplit(
+                  scaledFractionOfRemainder / elementProgress.getWorkRemaining());
         } else {
-          windowedSplitResult =
-              computeWindowSplitResult(
-                  currentElement,
-                  currentRestriction,
-                  currentWindow,
-                  windows,
-                  currentWatermarkEstimatorState,
-                  newWindowStopIndex,
-                  newWindowStopIndex,
-                  stopWindowIndex,
-                  null,
-                  watermarkAndState);
+          downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder);
         }
+        newWindowStopIndex = currentWindowIndex + 1;
+        int toIndex =
+            (elementSplit == null && downstreamSplitResult == null)
+                ? newWindowStopIndex
+                : currentWindowIndex;
+        windowedSplitResult =
+            computeWindowSplitResult(
+                currentElement,
+                currentRestriction,
+                currentWindow,
+                windows,
+                currentWatermarkEstimatorState,
+                toIndex,
+                newWindowStopIndex,
+                stopWindowIndex,
+                elementSplit,
+                watermarkAndState);
       }
     } else {
       // We are on the last window then compute the element split with given fraction.
+      SplitResult<?> elementSplitResult = null;
       newWindowStopIndex = stopWindowIndex;
-      SplitResult<?> splitResult = currentTracker.trySplit(fractionOfRemainder);
-      if (splitResult == null) {
+      if (currentTracker != null) {
+        elementSplitResult = currentTracker.trySplit(fractionOfRemainder);
+      } else {
+        downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
+      }
+      if (elementSplitResult == null && downstreamSplitResult == null) {
         return null;
       }
       windowedSplitResult =
@@ -1546,50 +1433,32 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
               currentWindowIndex,
               stopWindowIndex,
               stopWindowIndex,
-              splitResult,
+              elementSplitResult,
               watermarkAndState);
     }
-    return KV.of(windowedSplitResult, newWindowStopIndex);
+    return SplitResultsWithStopIndex.of(
+        windowedSplitResult, downstreamSplitResult, newWindowStopIndex);
   }
 
-  private HandlesSplits.SplitResult trySplitForElementAndRestriction(
-      double fractionOfRemainder, Duration resumeDelay) {
-    KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
-    WindowedSplitResult windowedSplitResult = null;
-    synchronized (splitLock) {
-      // There is nothing to split if we are between element and restriction processing calls.
-      if (currentTracker == null) {
-        return null;
-      }
-      // Make sure to get the output watermark before we split to ensure that the lower bound
-      // applies to the residual.
-      watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
-      KV<WindowedSplitResult, Integer> splitResult =
-          trySplitForProcess(
-              currentElement,
-              currentRestriction,
-              currentWindow,
-              currentWindows,
-              currentWatermarkEstimatorState,
-              fractionOfRemainder,
-              currentTracker,
-              watermarkAndState,
-              windowCurrentIndex,
-              windowStopIndex);
-      if (splitResult == null) {
-        return null;
-      }
-      windowStopIndex = splitResult.getValue();
-      windowedSplitResult =
-          calculateRestrictionSize(
-              splitResult.getKey(),
-              PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
-                  + "/GetSize");
-    }
-
+  @VisibleForTesting
+  static <WatermarkEstimatorStateT> HandlesSplits.SplitResult constructSplitResult(
+      WindowedSplitResult windowedSplitResult,
+      HandlesSplits.SplitResult downstreamElementSplit,
+      Coder fullInputCoder,
+      Instant initialWatermark,
+      KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
+      String pTransformId,
+      String mainInputId,
+      Collection<String> outputIds,
+      Duration resumeDelay) {
+    // The element split cannot from both windowedSplitResult and downstreamElementSplit.
+    checkArgument(
+        (windowedSplitResult == null || windowedSplitResult.getResidualSplitRoot() == null)
+            || downstreamElementSplit == null);
     List<BundleApplication> primaryRoots = new ArrayList<>();
     List<DelayedBundleApplication> residualRoots = new ArrayList<>();
-    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+
+    // Encode window splits.
     if (windowedSplitResult != null
         && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
       ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
@@ -1630,7 +1499,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                 .setSeconds(initialWatermark.getMillis() / 1000)
                 .setNanos((int) (initialWatermark.getMillis() % 1000) * 1000000)
                 .build();
-        for (String outputId : pTransform.getOutputsMap().keySet()) {
+        for (String outputId : outputIds) {
           outputWatermarkMapForUnprocessedWindows.put(outputId, outputWatermark);
         }
       }
@@ -1642,9 +1511,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
               .build());
     }
 
-    if (windowedSplitResult.getResidualSplitRoot() != null) {
-      ByteString.Output primaryBytes = ByteString.newOutput();
-      ByteString.Output residualBytes = ByteString.newOutput();
+    ByteString.Output primaryBytes = ByteString.newOutput();
+    ByteString.Output residualBytes = ByteString.newOutput();
+    // Encode element split from windowedSplitResult or from downstream element split. It's possible
+    // that there is no element split.
+    if (windowedSplitResult != null && windowedSplitResult.getResidualSplitRoot() != null) {
+      // When there is element split in windowedSplitResult, the resumeDelay should not be null.
+      checkNotNull(resumeDelay);
       try {
         fullInputCoder.encode(windowedSplitResult.getPrimarySplitRoot(), primaryBytes);
         fullInputCoder.encode(windowedSplitResult.getResidualSplitRoot(), residualBytes);
@@ -1670,7 +1543,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                 .setSeconds(watermarkAndState.getKey().getMillis() / 1000)
                 .setNanos((int) (watermarkAndState.getKey().getMillis() % 1000) * 1000000)
                 .build();
-        for (String outputId : pTransform.getOutputsMap().keySet()) {
+        for (String outputId : outputIds) {
           outputWatermarkMap.put(outputId, outputWatermark);
         }
       }
@@ -1680,11 +1553,64 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
               .setApplication(residualApplication)
               .setRequestedTimeDelay(Durations.fromMillis(resumeDelay.getMillis()))
               .build());
+
+    } else if (downstreamElementSplit != null) {
+      primaryRoots.add(Iterables.getOnlyElement(downstreamElementSplit.getPrimaryRoots()));
+      residualRoots.add(Iterables.getOnlyElement(downstreamElementSplit.getResidualRoots()));
     }
 
     return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
   }
 
+  private HandlesSplits.SplitResult trySplitForElementAndRestriction(
+      double fractionOfRemainder, Duration resumeDelay) {
+    KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
+    WindowedSplitResult windowedSplitResult = null;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between element and restriction processing calls.
+      if (currentTracker == null) {
+        return null;
+      }
+      // Make sure to get the output watermark before we split to ensure that the lower bound
+      // applies to the residual.
+      watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
+      SplitResultsWithStopIndex splitResult =
+          computeSplitForProcessOrTruncate(
+              currentElement,
+              currentRestriction,
+              currentWindow,
+              currentWindows,
+              currentWatermarkEstimatorState,
+              fractionOfRemainder,
+              currentTracker,
+              null,
+              watermarkAndState,
+              windowCurrentIndex,
+              windowStopIndex);
+      if (splitResult == null) {
+        return null;
+      }
+      windowStopIndex = splitResult.getNewWindowStopIndex();
+      // Populate the size of primary/residual.
+      windowedSplitResult =
+          calculateRestrictionSize(
+              splitResult.getWindowSplit(),
+              PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
+                  + "/GetSize");
+    }
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    return constructSplitResult(
+        windowedSplitResult,
+        null,
+        fullInputCoder,
+        initialWatermark,
+        watermarkAndState,
+        pTransformId,
+        mainInputId,
+        pTransform.getOutputsMap().keySet(),
+        resumeDelay);
+  }
+
   private <K> void processTimer(
       String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
     currentTimer = timer;
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index b388587..27a7a76 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import org.apache.beam.fn.harness.FnApiDoFnRunner.SplitResultsWithStopIndex;
 import org.apache.beam.fn.harness.FnApiDoFnRunner.WindowedSplitResult;
 import org.apache.beam.fn.harness.HandlesSplits.SplitResult;
 import org.apache.beam.fn.harness.PTransformRunnerFactory.ProgressRequestCallback;
@@ -82,8 +83,11 @@ import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.fn.data.FnDataReceiver;
 import org.apache.beam.sdk.fn.data.LogicalEndpoint;
 import org.apache.beam.sdk.function.ThrowingRunnable;
@@ -134,6 +138,7 @@ import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
 import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.util.Durations;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -145,6 +150,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.slf4j.Logger;
@@ -3768,14 +3774,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
 
   @RunWith(JUnit4.class)
   public static class SplitTest {
+    @Rule public final ExpectedException expected = ExpectedException.none();
     private IntervalWindow window1;
     private IntervalWindow window2;
     private IntervalWindow window3;
     private WindowedValue<String> currentElement;
     private OffsetRange currentRestriction;
     private Instant currentWatermarkEstimatorState;
+    private Instant initialWatermark;
     KV<Instant, Instant> watermarkAndState;
 
+    private static final String PROCESS_TRANSFORM_ID = "processPTransformId";
+    private static final String TRUNCATE_TRANSFORM_ID = "truncatePTransformId";
+    private static final String PROCESS_INPUT_ID = "processInputId";
+    private static final String TRUNCATE_INPUT_ID = "truncateInputId";
+    private static final String PROCESS_OUTPUT_ID = "processOutputId";
+    private static final String TRUNCATE_OUTPUT_ID = "truncateOutputId";
+
     private KV<WindowedValue, WindowedValue> createSplitInWindow(
         OffsetRange primaryRestriction, OffsetRange residualRestriction, BoundedWindow window) {
       return KV.of(
@@ -3818,6 +3833,56 @@ public class FnApiDoFnRunnerTest implements Serializable {
                   currentElement.getPane()));
     }
 
+    private KV<WindowedValue, WindowedValue> createSplitWithSizeInWindow(
+        OffsetRange primaryRestriction, OffsetRange residualRestriction, BoundedWindow window) {
+      return KV.of(
+          WindowedValue.of(
+              KV.of(
+                  KV.of(
+                      currentElement.getValue(),
+                      KV.of(primaryRestriction, currentWatermarkEstimatorState)),
+                  (double) (primaryRestriction.getTo() - primaryRestriction.getFrom())),
+              currentElement.getTimestamp(),
+              window,
+              currentElement.getPane()),
+          WindowedValue.of(
+              KV.of(
+                  KV.of(
+                      currentElement.getValue(),
+                      KV.of(residualRestriction, watermarkAndState.getValue())),
+                  (double) (residualRestriction.getTo() - residualRestriction.getFrom())),
+              currentElement.getTimestamp(),
+              window,
+              currentElement.getPane()));
+    }
+
+    private KV<WindowedValue, WindowedValue> createSplitWithSizeAcrossWindows(
+        List<BoundedWindow> primaryWindows, List<BoundedWindow> residualWindows) {
+      return KV.of(
+          primaryWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      (double) (currentRestriction.getTo() - currentRestriction.getFrom())),
+                  currentElement.getTimestamp(),
+                  primaryWindows,
+                  currentElement.getPane()),
+          residualWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      (double) (currentRestriction.getTo() - currentRestriction.getFrom())),
+                  currentElement.getTimestamp(),
+                  residualWindows,
+                  currentElement.getPane()));
+    }
+
     @Before
     public void setUp() {
       window1 = new IntervalWindow(Instant.ofEpochMilli(0), Instant.ofEpochMilli(10));
@@ -3831,11 +3896,12 @@ public class FnApiDoFnRunnerTest implements Serializable {
               PaneInfo.NO_FIRING);
       currentRestriction = new OffsetRange(0L, 100L);
       currentWatermarkEstimatorState = Instant.ofEpochMilli(21);
+      initialWatermark = Instant.ofEpochMilli(25);
       watermarkAndState = KV.of(Instant.ofEpochMilli(42), Instant.ofEpochMilli(42));
     }
 
     @Test
-    public void testScaleProgress() throws Exception {
+    public void testScaledProgress() throws Exception {
       Progress elementProgress = Progress.from(2, 8);
       // There is only one window.
       Progress scaledResult = FnApiDoFnRunner.scaleProgress(elementProgress, 0, 1);
@@ -3859,12 +3925,66 @@ public class FnApiDoFnRunnerTest implements Serializable {
     }
 
     @Test
+    public void testComputeSplitForProcessOrTruncateWithNullTrackerAndSplitDelegate()
+        throws Exception {
+      expected.expect(IllegalArgumentException.class);
+      FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+          currentElement,
+          currentRestriction,
+          window1,
+          ImmutableList.copyOf(currentElement.getWindows()),
+          currentWatermarkEstimatorState,
+          0.0,
+          null,
+          null,
+          null,
+          0,
+          3);
+    }
+
+    @Test
+    public void testComputeSplitForProcessOrTruncateWithNotNullTrackerAndDelegate()
+        throws Exception {
+      expected.expect(IllegalArgumentException.class);
+      FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+          currentElement,
+          currentRestriction,
+          window1,
+          ImmutableList.copyOf(currentElement.getWindows()),
+          currentWatermarkEstimatorState,
+          0.0,
+          new OffsetRangeTracker(currentRestriction),
+          createSplitDelegate(0.3, 0.0, null),
+          null,
+          0,
+          3);
+    }
+
+    @Test
+    public void testComputeSplitForProcessOrTruncateWithInvalidWatermarkAndState()
+        throws Exception {
+      expected.expect(NullPointerException.class);
+      FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+          currentElement,
+          currentRestriction,
+          window1,
+          ImmutableList.copyOf(currentElement.getWindows()),
+          currentWatermarkEstimatorState,
+          0.0,
+          new OffsetRangeTracker(currentRestriction),
+          null,
+          null,
+          0,
+          3);
+    }
+
+    @Test
     public void testTrySplitForProcessCheckpointOnFirstWindow() throws Exception {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(30L);
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.<Instant>computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
@@ -3872,20 +3992,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0.0,
               tracker,
+              null,
               watermarkAndState,
               0,
               3);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedElementSplit =
           createSplitInWindow(new OffsetRange(0, 31), new OffsetRange(31, 100), window1);
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2, window3));
-      assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
-      assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+      assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+      assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -3893,8 +4016,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(30L);
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.<Instant>computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
@@ -3902,20 +4025,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0.0,
               tracker,
+              null,
               watermarkAndState,
               0,
               2);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedElementSplit =
           createSplitInWindow(new OffsetRange(0, 31), new OffsetRange(31, 100), window1);
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2));
-      assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
-      assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+      assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+      assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -3923,8 +4049,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(30L);
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.<Instant>computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
@@ -3932,20 +4058,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0.2,
               tracker,
+              null,
               watermarkAndState,
               0,
               3);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedElementSplit =
           createSplitInWindow(new OffsetRange(0, 84), new OffsetRange(84, 100), window1);
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2, window3));
-      assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
-      assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+      assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+      assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -3953,8 +4082,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(30L);
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window2,
@@ -3962,22 +4091,25 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0.2,
               tracker,
+              null,
               watermarkAndState,
               1,
               3);
-      assertEquals(2, (int) result.getValue());
+      assertEquals(2, result.getNewWindowStopIndex());
       // Java uses BigDecimal so 0.2 * 170 = 63.9...
       // BigDecimal.longValue() will round down to 63 instead of the expected 64
       KV<WindowedValue, WindowedValue> expectedElementSplit =
           createSplitInWindow(new OffsetRange(0, 63), new OffsetRange(63, 100), window2);
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window3));
-      assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
-      assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+      assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+      assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -3985,8 +4117,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(30L);
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window3,
@@ -3994,20 +4126,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0.2,
               tracker,
+              null,
               watermarkAndState,
               2,
               3);
-      assertEquals(3, (int) result.getValue());
+      assertEquals(3, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedElementSplit =
           createSplitInWindow(new OffsetRange(0, 44), new OffsetRange(44, 100), window3);
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of());
-      assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
-      assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+      assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+      assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4016,8 +4151,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(100L);
       assertNull(tracker.trySplit(0.0));
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window3,
@@ -4025,18 +4160,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0,
               tracker,
+              null,
               watermarkAndState,
               0,
               3);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window2, window3));
-      assertNull(result.getKey().getPrimarySplitRoot());
-      assertNull(result.getKey().getResidualSplitRoot());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4045,8 +4183,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(100L);
       assertNull(tracker.trySplit(0.0));
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window3,
@@ -4054,6 +4192,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0,
               tracker,
+              null,
               watermarkAndState,
               2,
               3);
@@ -4065,8 +4204,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(30L);
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window2,
@@ -4074,18 +4213,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0.6,
               tracker,
+              null,
               watermarkAndState,
               0,
               3);
-      assertEquals(2, (int) result.getValue());
+      assertEquals(2, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of(window3));
-      assertNull(result.getKey().getPrimarySplitRoot());
-      assertNull(result.getKey().getResidualSplitRoot());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4093,8 +4235,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(30L);
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window2,
@@ -4102,18 +4244,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0.3,
               tracker,
+              null,
               watermarkAndState,
               0,
               3);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window2, window3));
-      assertNull(result.getKey().getPrimarySplitRoot());
-      assertNull(result.getKey().getResidualSplitRoot());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4121,8 +4266,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
       tracker.tryClaim(30L);
-      KV<WindowedSplitResult, Integer> result =
-          FnApiDoFnRunner.trySplitForProcess(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window2,
@@ -4130,18 +4275,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
               currentWatermarkEstimatorState,
               0.9,
               tracker,
+              null,
               watermarkAndState,
               0,
               3);
-      assertEquals(2, (int) result.getValue());
+      assertEquals(2, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of(window3));
-      assertNull(result.getKey().getPrimarySplitRoot());
-      assertNull(result.getKey().getResidualSplitRoot());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
-          expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+          expectedWindowSplit.getKey(),
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
-          expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+          expectedWindowSplit.getValue(),
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     private HandlesSplits createSplitDelegate(
@@ -4168,27 +4316,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, splitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.0,
+              null,
               splitDelegate,
+              null,
               0,
               3);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2, window3));
-      assertEquals(splitResult, result.getKey().getValue());
+      assertEquals(splitResult, result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4199,27 +4351,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, splitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.0,
+              null,
               splitDelegate,
+              null,
               0,
               2);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2));
-      assertEquals(splitResult, result.getKey().getValue());
+      assertEquals(splitResult, result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4230,27 +4386,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.54, splitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.2,
+              null,
               splitDelegate,
+              null,
               0,
               3);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2, window3));
-      assertEquals(splitResult, result.getKey().getValue());
+      assertEquals(splitResult, result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4261,27 +4421,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.34, splitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.2,
+              null,
               splitDelegate,
+              null,
               1,
               3);
-      assertEquals(2, (int) result.getValue());
+      assertEquals(2, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window3));
-      assertEquals(splitResult, result.getKey().getValue());
+      assertEquals(splitResult, result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4292,27 +4456,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.2, splitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.2,
+              null,
               splitDelegate,
+              null,
               2,
               3);
-      assertEquals(3, (int) result.getValue());
+      assertEquals(3, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of());
-      assertEquals(splitResult, result.getKey().getValue());
+      assertEquals(splitResult, result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4323,42 +4491,48 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(1.0, 0.0, unusedSplitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.0,
+              null,
               splitDelegate,
+              null,
               0,
               3);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window2, window3));
-      assertNull(result.getKey().getValue());
+      assertNull(result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
     public void testTrySplitForTruncateSplitOnLastWindowWhenNoElementSplit() throws Exception {
       List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
       HandlesSplits splitDelegate = createSplitDelegate(1.0, 0.0, null);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.0,
+              null,
               splitDelegate,
+              null,
               2,
               3);
       assertNull(result);
@@ -4372,27 +4546,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, unusedSplitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.6,
+              null,
               splitDelegate,
+              null,
               0,
               3);
-      assertEquals(2, (int) result.getValue());
+      assertEquals(2, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of(window3));
-      assertNull(result.getKey().getValue());
+      assertNull(result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4403,27 +4581,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, unusedSplitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.3,
+              null,
               splitDelegate,
+              null,
               0,
               3);
-      assertEquals(1, (int) result.getValue());
+      assertEquals(1, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window2, window3));
-      assertNull(result.getKey().getValue());
+      assertNull(result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
     }
 
     @Test
@@ -4434,27 +4616,313 @@ public class FnApiDoFnRunnerTest implements Serializable {
               ImmutableList.of(BundleApplication.getDefaultInstance()),
               ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
       HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, unusedSplitResult);
-      KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
-          FnApiDoFnRunner.trySplitForTruncate(
+      SplitResultsWithStopIndex result =
+          FnApiDoFnRunner.computeSplitForProcessOrTruncate(
               currentElement,
               currentRestriction,
               window1,
               windows,
               currentWatermarkEstimatorState,
               0.6,
+              null,
               splitDelegate,
+              null,
               0,
               3);
-      assertEquals(2, (int) result.getValue());
+      assertEquals(2, result.getNewWindowStopIndex());
       KV<WindowedValue, WindowedValue> expectedWindowSplit =
           createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of(window3));
-      assertNull(result.getKey().getValue());
+      assertNull(result.getDownstreamSplit());
+      assertNull(result.getWindowSplit().getPrimarySplitRoot());
+      assertNull(result.getWindowSplit().getResidualSplitRoot());
       assertEquals(
           expectedWindowSplit.getKey(),
-          result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+          result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
       assertEquals(
           expectedWindowSplit.getValue(),
-          result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+          result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
+    }
+
+    @Test
+    public void testConstructSplitResultWithInvalidElementSplits() throws Exception {
+      expected.expect(IllegalArgumentException.class);
+      FnApiDoFnRunner.constructSplitResult(
+          WindowedSplitResult.forRoots(
+              null,
+              WindowedValue.valueInGlobalWindow("elementPrimary"),
+              WindowedValue.valueInGlobalWindow("elementResidual"),
+              null),
+          HandlesSplits.SplitResult.of(
+              ImmutableList.of(BundleApplication.getDefaultInstance()),
+              ImmutableList.of(DelayedBundleApplication.getDefaultInstance())),
+          WindowedValue.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE),
+          Instant.now(),
+          null,
+          "ptransformId",
+          "inputId",
+          ImmutableList.of("outputId"),
+          null);
+    }
+
+    private Coder getFullInputCoder(
+        Coder elementCoder, Coder restrictionCoder, Coder watermarkStateCoder, Coder windowCoder) {
+      Coder inputCoder =
+          KvCoder.of(
+              KvCoder.of(elementCoder, KvCoder.of(restrictionCoder, watermarkStateCoder)),
+              DoubleCoder.of());
+      return WindowedValue.getFullCoder(inputCoder, windowCoder);
+    }
+
+    private HandlesSplits.SplitResult getProcessElementSplit(String transformId, String inputId) {
+      return SplitResult.of(
+          ImmutableList.of(
+              BundleApplication.newBuilder()
+                  .setTransformId(transformId)
+                  .setInputId(inputId)
+                  .build()),
+          ImmutableList.of(
+              DelayedBundleApplication.newBuilder()
+                  .setApplication(
+                      BundleApplication.newBuilder()
+                          .setTransformId(transformId)
+                          .setInputId(inputId)
+                          .build())
+                  .setRequestedTimeDelay(Durations.fromMillis(1000L))
+                  .build()));
+    }
+
+    private org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp toTimestamp(
+        Instant time) {
+      return org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
+          .setSeconds(time.getMillis() / 1000)
+          .setNanos((int) (time.getMillis() % 1000) * 1000000)
+          .build();
+    }
+
+    @Test
+    public void testConstructSplitResultWithElementSplitFromDelegate() throws Exception {
+      Coder fullInputCoder =
+          getFullInputCoder(
+              StringUtf8Coder.of(),
+              OffsetRange.Coder.of(),
+              InstantCoder.of(),
+              IntervalWindow.getCoder());
+      HandlesSplits.SplitResult elementSplit =
+          getProcessElementSplit(PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID);
+      HandlesSplits.SplitResult result =
+          FnApiDoFnRunner.constructSplitResult(
+              null,
+              elementSplit,
+              fullInputCoder,
+              null,
+              null,
+              TRUNCATE_TRANSFORM_ID,
+              TRUNCATE_INPUT_ID,
+              ImmutableList.of(TRUNCATE_OUTPUT_ID),
+              null);
+      assertEquals(elementSplit.getPrimaryRoots(), result.getPrimaryRoots());
+      assertEquals(elementSplit.getResidualRoots(), result.getResidualRoots());
+    }
+
+    @Test
+    public void testConstructSplitResultWithElementSplitFromTracker() throws Exception {
+      Coder fullInputCoder =
+          getFullInputCoder(
+              StringUtf8Coder.of(),
+              OffsetRange.Coder.of(),
+              InstantCoder.of(),
+              IntervalWindow.getCoder());
+      KV<WindowedValue, WindowedValue> elementSplit =
+          createSplitWithSizeInWindow(new OffsetRange(0, 31), new OffsetRange(31, 100), window1);
+      HandlesSplits.SplitResult result =
+          FnApiDoFnRunner.constructSplitResult(
+              WindowedSplitResult.forRoots(
+                  null, elementSplit.getKey(), elementSplit.getValue(), null),
+              null,
+              fullInputCoder,
+              null,
+              watermarkAndState,
+              PROCESS_TRANSFORM_ID,
+              PROCESS_INPUT_ID,
+              ImmutableList.of(PROCESS_OUTPUT_ID),
+              Duration.millis(100L));
+      assertEquals(1, result.getPrimaryRoots().size());
+      BundleApplication primaryRoot = result.getPrimaryRoots().get(0);
+      assertEquals(PROCESS_TRANSFORM_ID, primaryRoot.getTransformId());
+      assertEquals(PROCESS_INPUT_ID, primaryRoot.getInputId());
+      assertEquals(
+          elementSplit.getKey(), fullInputCoder.decode(primaryRoot.getElement().newInput()));
+
+      assertEquals(1, result.getResidualRoots().size());
+      DelayedBundleApplication residualRoot = result.getResidualRoots().get(0);
+      assertEquals(Durations.fromMillis(100L), residualRoot.getRequestedTimeDelay());
+      assertEquals(PROCESS_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
+      assertEquals(PROCESS_INPUT_ID, residualRoot.getApplication().getInputId());
+      assertEquals(
+          toTimestamp(watermarkAndState.getValue()),
+          residualRoot.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
+      assertEquals(
+          elementSplit.getValue(),
+          fullInputCoder.decode(residualRoot.getApplication().getElement().newInput()));
+    }
+
+    @Test
+    public void testConstructSplitResultWithOnlyWindowSplits() throws Exception {
+      Coder fullInputCoder =
+          getFullInputCoder(
+              StringUtf8Coder.of(),
+              OffsetRange.Coder.of(),
+              InstantCoder.of(),
+              IntervalWindow.getCoder());
+      KV<WindowedValue, WindowedValue> windowSplit =
+          createSplitWithSizeAcrossWindows(
+              ImmutableList.of(window1), ImmutableList.of(window2, window3));
+      HandlesSplits.SplitResult result =
+          FnApiDoFnRunner.constructSplitResult(
+              WindowedSplitResult.forRoots(
+                  windowSplit.getKey(), null, null, windowSplit.getValue()),
+              null,
+              fullInputCoder,
+              initialWatermark,
+              watermarkAndState,
+              PROCESS_TRANSFORM_ID,
+              PROCESS_INPUT_ID,
+              ImmutableList.of(PROCESS_OUTPUT_ID),
+              Duration.millis(100L));
+      assertEquals(1, result.getPrimaryRoots().size());
+      BundleApplication primaryRoot = result.getPrimaryRoots().get(0);
+      assertEquals(PROCESS_TRANSFORM_ID, primaryRoot.getTransformId());
+      assertEquals(PROCESS_INPUT_ID, primaryRoot.getInputId());
+      assertEquals(
+          windowSplit.getKey(), fullInputCoder.decode(primaryRoot.getElement().newInput()));
+
+      assertEquals(1, result.getResidualRoots().size());
+      DelayedBundleApplication residualRoot = result.getResidualRoots().get(0);
+      assertEquals(
+          org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Duration.getDefaultInstance(),
+          residualRoot.getRequestedTimeDelay());
+      assertEquals(PROCESS_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
+      assertEquals(PROCESS_INPUT_ID, residualRoot.getApplication().getInputId());
+      assertEquals(
+          toTimestamp(initialWatermark),
+          residualRoot.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
+      assertEquals(
+          windowSplit.getValue(),
+          fullInputCoder.decode(residualRoot.getApplication().getElement().newInput()));
+    }
+
+    @Test
+    public void testConstructSplitResultWithElementAndWindowSplitFromProcess() throws Exception {
+      Coder fullInputCoder =
+          getFullInputCoder(
+              StringUtf8Coder.of(),
+              OffsetRange.Coder.of(),
+              InstantCoder.of(),
+              IntervalWindow.getCoder());
+      KV<WindowedValue, WindowedValue> windowSplit =
+          createSplitWithSizeAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window3));
+      KV<WindowedValue, WindowedValue> elementSplit =
+          createSplitWithSizeInWindow(new OffsetRange(0, 31), new OffsetRange(31, 100), window2);
+      HandlesSplits.SplitResult result =
+          FnApiDoFnRunner.constructSplitResult(
+              WindowedSplitResult.forRoots(
+                  windowSplit.getKey(),
+                  elementSplit.getKey(),
+                  elementSplit.getValue(),
+                  windowSplit.getValue()),
+              null,
+              fullInputCoder,
+              initialWatermark,
+              watermarkAndState,
+              PROCESS_TRANSFORM_ID,
+              PROCESS_INPUT_ID,
+              ImmutableList.of(PROCESS_OUTPUT_ID),
+              Duration.millis(100L));
+      assertEquals(2, result.getPrimaryRoots().size());
+      BundleApplication windowPrimary = result.getPrimaryRoots().get(0);
+      BundleApplication elementPrimary = result.getPrimaryRoots().get(1);
+      assertEquals(PROCESS_TRANSFORM_ID, windowPrimary.getTransformId());
+      assertEquals(PROCESS_INPUT_ID, windowPrimary.getInputId());
+      assertEquals(
+          windowSplit.getKey(), fullInputCoder.decode(windowPrimary.getElement().newInput()));
+      assertEquals(PROCESS_TRANSFORM_ID, elementPrimary.getTransformId());
+      assertEquals(PROCESS_INPUT_ID, elementPrimary.getInputId());
+      assertEquals(
+          elementSplit.getKey(), fullInputCoder.decode(elementPrimary.getElement().newInput()));
+
+      assertEquals(2, result.getResidualRoots().size());
+      DelayedBundleApplication windowResidual = result.getResidualRoots().get(0);
+      DelayedBundleApplication elementResidual = result.getResidualRoots().get(1);
+      assertEquals(
+          org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Duration.getDefaultInstance(),
+          windowResidual.getRequestedTimeDelay());
+      assertEquals(PROCESS_TRANSFORM_ID, windowResidual.getApplication().getTransformId());
+      assertEquals(PROCESS_INPUT_ID, windowResidual.getApplication().getInputId());
+      assertEquals(
+          toTimestamp(initialWatermark),
+          windowResidual.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
+      assertEquals(
+          windowSplit.getValue(),
+          fullInputCoder.decode(windowResidual.getApplication().getElement().newInput()));
+      assertEquals(Durations.fromMillis(100L), elementResidual.getRequestedTimeDelay());
+      assertEquals(PROCESS_TRANSFORM_ID, elementResidual.getApplication().getTransformId());
+      assertEquals(PROCESS_INPUT_ID, elementResidual.getApplication().getInputId());
+      assertEquals(
+          toTimestamp(watermarkAndState.getValue()),
+          elementResidual.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
+      assertEquals(
+          elementSplit.getValue(),
+          fullInputCoder.decode(elementResidual.getApplication().getElement().newInput()));
+    }
+
+    @Test
+    public void testConstructSplitResultWithElementAndWindowSplitFromTruncate() throws Exception {
+      Coder fullInputCoder =
+          getFullInputCoder(
+              StringUtf8Coder.of(),
+              OffsetRange.Coder.of(),
+              InstantCoder.of(),
+              IntervalWindow.getCoder());
+      KV<WindowedValue, WindowedValue> windowSplit =
+          createSplitWithSizeAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window3));
+      HandlesSplits.SplitResult elementSplit =
+          getProcessElementSplit(PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID);
+      HandlesSplits.SplitResult result =
+          FnApiDoFnRunner.constructSplitResult(
+              WindowedSplitResult.forRoots(
+                  windowSplit.getKey(), null, null, windowSplit.getValue()),
+              elementSplit,
+              fullInputCoder,
+              initialWatermark,
+              watermarkAndState,
+              TRUNCATE_TRANSFORM_ID,
+              TRUNCATE_INPUT_ID,
+              ImmutableList.of(TRUNCATE_OUTPUT_ID),
+              Duration.millis(100L));
+      assertEquals(2, result.getPrimaryRoots().size());
+      BundleApplication windowPrimary = result.getPrimaryRoots().get(0);
+      BundleApplication elementPrimary = result.getPrimaryRoots().get(1);
+      assertEquals(TRUNCATE_TRANSFORM_ID, windowPrimary.getTransformId());
+      assertEquals(TRUNCATE_INPUT_ID, windowPrimary.getInputId());
+      assertEquals(
+          windowSplit.getKey(), fullInputCoder.decode(windowPrimary.getElement().newInput()));
+      assertEquals(elementSplit.getPrimaryRoots().get(0), elementPrimary);
+
+      assertEquals(2, result.getResidualRoots().size());
+      DelayedBundleApplication windowResidual = result.getResidualRoots().get(0);
+      DelayedBundleApplication elementResidual = result.getResidualRoots().get(1);
+      assertEquals(
+          org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Duration.getDefaultInstance(),
+          windowResidual.getRequestedTimeDelay());
+      assertEquals(TRUNCATE_TRANSFORM_ID, windowResidual.getApplication().getTransformId());
+      assertEquals(TRUNCATE_INPUT_ID, windowResidual.getApplication().getInputId());
+      assertEquals(
+          toTimestamp(initialWatermark),
+          windowResidual.getApplication().getOutputWatermarksMap().get(TRUNCATE_OUTPUT_ID));
+      assertEquals(
+          windowSplit.getValue(),
+          fullInputCoder.decode(windowResidual.getApplication().getElement().newInput()));
+      assertEquals(elementSplit.getResidualRoots().get(0), elementResidual);
     }
   }
 }