You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/09/02 19:32:56 UTC
[beam] branch master updated: Refactor split logic to reuse common
logic.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 02a6da9 Refactor split logic to reuse common logic.
new b0abce2 Merge pull request #12710 from boyuanzz/split_refactor
02a6da9 is described below
commit 02a6da9ab3d59e6cba2530153e8ca5e7598d1174
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Aug 27 16:36:55 2020 -0700
Refactor split logic to reuse common logic.
---
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 410 +++++-------
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 712 +++++++++++++++++----
2 files changed, 758 insertions(+), 364 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 9c7ee2e..f77d8d6 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -984,6 +984,23 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
public abstract @Nullable WindowedValue getResidualInUnprocessedWindowsRoot();
}
+ @AutoValue
+ abstract static class SplitResultsWithStopIndex {
+ public static SplitResultsWithStopIndex of(
+ WindowedSplitResult windowSplit,
+ HandlesSplits.SplitResult downstreamSplit,
+ int newWindowStopIndex) {
+ return new AutoValue_FnApiDoFnRunner_SplitResultsWithStopIndex(
+ windowSplit, downstreamSplit, newWindowStopIndex);
+ }
+
+ public abstract @Nullable WindowedSplitResult getWindowSplit();
+
+ public abstract HandlesSplits.@Nullable SplitResult getDownstreamSplit();
+
+ public abstract int getNewWindowStopIndex();
+ }
+
private void processElementForWindowObservingSizedElementAndRestriction(
WindowedValue<KV<KV<InputT, KV<RestrictionT, WatermarkEstimatorStateT>>, Double>> elem) {
currentElement = elem.withValue(elem.getValue().getKey().getKey());
@@ -1195,102 +1212,8 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
splitResult.getResidualInUnprocessedWindowsRoot().getPane()));
}
- @VisibleForTesting
- static <WatermarkEstimatorStateT>
- KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> trySplitForTruncate(
- WindowedValue currentElement,
- Object currentRestriction,
- BoundedWindow currentWindow,
- List<BoundedWindow> windows,
- WatermarkEstimatorStateT currentWatermarkEstimatorState,
- double fractionOfRemainder,
- HandlesSplits splitDelegate,
- int currentWindowIndex,
- int stopWindowIndex) {
- WindowedSplitResult windowedSplitResult = null;
- HandlesSplits.SplitResult downstreamSplitResult = null;
- int newWindowStopIndex = stopWindowIndex;
- // If we are not on the last window, try to compute the split which is on the current window or
- // on a future window.
- if (currentWindowIndex != stopWindowIndex - 1) {
- // Compute the fraction of the remainder relative to the scaled progress.
- double elementCompleted = splitDelegate.getProgress();
- Progress elementProgress = Progress.from(elementCompleted, 1 - elementCompleted);
- Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
- double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
- // The fraction is out of the current window and hence we will split at the closest window
- // boundary.
- if (scaledFractionOfRemainder >= elementProgress.getWorkRemaining()) {
- newWindowStopIndex =
- (int)
- Math.min(
- stopWindowIndex - 1,
- currentWindowIndex
- + Math.max(
- 1,
- Math.round(
- (elementProgress.getWorkCompleted() + scaledFractionOfRemainder)
- / (elementProgress.getWorkCompleted()
- + elementProgress.getWorkRemaining()))));
- windowedSplitResult =
- computeWindowSplitResult(
- currentElement,
- currentRestriction,
- currentWindow,
- windows,
- currentWatermarkEstimatorState,
- newWindowStopIndex,
- newWindowStopIndex,
- stopWindowIndex,
- null,
- null);
-
- } else {
- // Compute the downstream element split with the scaled fraction.
- downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder);
- newWindowStopIndex = currentWindowIndex + 1;
- windowedSplitResult =
- computeWindowSplitResult(
- currentElement,
- currentRestriction,
- currentWindow,
- windows,
- currentWatermarkEstimatorState,
- currentWindowIndex,
- newWindowStopIndex,
- stopWindowIndex,
- null,
- null);
- }
- } else {
- // We are on the last window then compute the downstream element split with given fraction.
- newWindowStopIndex = stopWindowIndex;
- downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
- // We cannot produce any split if the downstream is not splittable.
- if (downstreamSplitResult == null) {
- return null;
- }
- windowedSplitResult =
- computeWindowSplitResult(
- currentElement,
- currentRestriction,
- currentWindow,
- windows,
- currentWatermarkEstimatorState,
- currentWindowIndex,
- stopWindowIndex,
- stopWindowIndex,
- null,
- null);
- }
- return KV.of(KV.of(windowedSplitResult, downstreamSplitResult), newWindowStopIndex);
- }
-
private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
double fractionOfRemainder, HandlesSplits splitDelegate) {
- // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
- // the same as the SDF/Process transform.
- Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
WindowedSplitResult windowedSplitResult = null;
HandlesSplits.SplitResult downstreamSplitResult = null;
synchronized (splitLock) {
@@ -1299,92 +1222,42 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
return null;
}
- KV<KV<WindowedSplitResult, HandlesSplits.SplitResult>, Integer> result =
- trySplitForTruncate(
+ SplitResultsWithStopIndex splitResult =
+ computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
currentWindow,
currentWindows,
currentWatermarkEstimatorState,
fractionOfRemainder,
+ null,
splitDelegate,
+ null,
windowCurrentIndex,
windowStopIndex);
- if (result == null) {
+ if (splitResult == null) {
return null;
}
- windowStopIndex = result.getValue();
+ windowStopIndex = splitResult.getNewWindowStopIndex();
windowedSplitResult =
calculateRestrictionSize(
- result.getKey().getKey(),
+ splitResult.getWindowSplit(),
PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN + "/GetSize");
- downstreamSplitResult = result.getKey().getValue();
- }
-
- List<BundleApplication> primaryRoots = new ArrayList<>();
- List<DelayedBundleApplication> residualRoots = new ArrayList<>();
-
- if (windowedSplitResult != null
- && windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
- ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
- try {
- fullInputCoder.encode(
- windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot(),
- primaryInOtherWindowsBytes);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- BundleApplication.Builder primaryApplicationInOtherWindows =
- BundleApplication.newBuilder()
- .setTransformId(pTransformId)
- .setInputId(mainInputId)
- .setElement(primaryInOtherWindowsBytes.toByteString());
- primaryRoots.add(primaryApplicationInOtherWindows.build());
- }
- if (windowedSplitResult != null
- && windowedSplitResult.getResidualInUnprocessedWindowsRoot() != null) {
- ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
- try {
- fullInputCoder.encode(
- windowedSplitResult.getResidualInUnprocessedWindowsRoot(),
- residualInUnprocessedWindowsBytesOut);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- // We don't want to change the output watermarks or set the checkpoint resume time since
- // that applies to the current window.
- Map<String, org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp>
- outputWatermarkMap = new HashMap<>();
- if (!initialWatermark.equals(GlobalWindow.TIMESTAMP_MIN_VALUE)) {
- org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp outputWatermark =
- org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
- .setSeconds(initialWatermark.getMillis() / 1000)
- .setNanos((int) (initialWatermark.getMillis() % 1000) * 1000000)
- .build();
- for (String outputId : pTransform.getOutputsMap().keySet()) {
- outputWatermarkMap.put(outputId, outputWatermark);
- }
- }
-
- BundleApplication.Builder residualApplicationInUnprocessedWindows =
- BundleApplication.newBuilder()
- .setTransformId(pTransformId)
- .setInputId(mainInputId)
- .putAllOutputWatermarks(outputWatermarkMap)
- .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
-
- residualRoots.add(
- DelayedBundleApplication.newBuilder()
- .setApplication(residualApplicationInUnprocessedWindows)
- .build());
+ downstreamSplitResult = splitResult.getDownstreamSplit();
}
-
- if (downstreamSplitResult != null) {
- primaryRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getPrimaryRoots()));
- residualRoots.add(Iterables.getOnlyElement(downstreamSplitResult.getResidualRoots()));
- }
-
- return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
+ // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+ // the same as the SDF/Process transform.
+ Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+ return constructSplitResult(
+ windowedSplitResult,
+ downstreamSplitResult,
+ fullInputCoder,
+ initialWatermark,
+ null,
+ pTransformId,
+ mainInputId,
+ pTransform.getOutputsMap().keySet(),
+ null);
}
private static <WatermarkEstimatorStateT> WindowedSplitResult computeWindowSplitResult(
@@ -1444,7 +1317,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
}
@VisibleForTesting
- static <WatermarkEstimatorStateT> KV<WindowedSplitResult, Integer> trySplitForProcess(
+ static <WatermarkEstimatorStateT> SplitResultsWithStopIndex computeSplitForProcessOrTruncate(
WindowedValue currentElement,
Object currentRestriction,
BoundedWindow currentWindow,
@@ -1452,20 +1325,34 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
WatermarkEstimatorStateT currentWatermarkEstimatorState,
double fractionOfRemainder,
RestrictionTracker currentTracker,
+ HandlesSplits splitDelegate,
KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
int currentWindowIndex,
int stopWindowIndex) {
+ // We should only have currentTracker or splitDelegate.
+ checkArgument((currentTracker != null) ^ (splitDelegate != null));
+ // When we have currentTracker, the watermarkAndState should not be null.
+ if (currentTracker != null) {
+ checkNotNull(watermarkAndState);
+ }
+
WindowedSplitResult windowedSplitResult = null;
+ HandlesSplits.SplitResult downstreamSplitResult = null;
int newWindowStopIndex = stopWindowIndex;
// If we are not on the last window, try to compute the split which is on the current window or
// on a future window.
if (currentWindowIndex != stopWindowIndex - 1) {
// Compute the fraction of the remainder relative to the scaled progress.
Progress elementProgress;
- if (currentTracker instanceof HasProgress) {
- elementProgress = ((HasProgress) currentTracker).getProgress();
+ if (currentTracker != null) {
+ if (currentTracker instanceof HasProgress) {
+ elementProgress = ((HasProgress) currentTracker).getProgress();
+ } else {
+ elementProgress = Progress.from(0, 1);
+ }
} else {
- elementProgress = Progress.from(0, 1);
+ double elementCompleted = splitDelegate.getProgress();
+ elementProgress = Progress.from(elementCompleted, 1 - elementCompleted);
}
Progress scaledProgress = scaleProgress(elementProgress, currentWindowIndex, stopWindowIndex);
double scaledFractionOfRemainder = scaledProgress.getWorkRemaining() * fractionOfRemainder;
@@ -1498,42 +1385,42 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
watermarkAndState);
} else {
// Compute the element split with the scaled fraction.
- SplitResult<?> elementSplit =
- currentTracker.trySplit(scaledFractionOfRemainder / elementProgress.getWorkRemaining());
- newWindowStopIndex = currentWindowIndex + 1;
- if (elementSplit != null) {
- windowedSplitResult =
- computeWindowSplitResult(
- currentElement,
- currentRestriction,
- currentWindow,
- windows,
- currentWatermarkEstimatorState,
- currentWindowIndex,
- newWindowStopIndex,
- stopWindowIndex,
- elementSplit,
- watermarkAndState);
+ SplitResult<?> elementSplit = null;
+ if (currentTracker != null) {
+ elementSplit =
+ currentTracker.trySplit(
+ scaledFractionOfRemainder / elementProgress.getWorkRemaining());
} else {
- windowedSplitResult =
- computeWindowSplitResult(
- currentElement,
- currentRestriction,
- currentWindow,
- windows,
- currentWatermarkEstimatorState,
- newWindowStopIndex,
- newWindowStopIndex,
- stopWindowIndex,
- null,
- watermarkAndState);
+ downstreamSplitResult = splitDelegate.trySplit(scaledFractionOfRemainder);
}
+ newWindowStopIndex = currentWindowIndex + 1;
+ int toIndex =
+ (elementSplit == null && downstreamSplitResult == null)
+ ? newWindowStopIndex
+ : currentWindowIndex;
+ windowedSplitResult =
+ computeWindowSplitResult(
+ currentElement,
+ currentRestriction,
+ currentWindow,
+ windows,
+ currentWatermarkEstimatorState,
+ toIndex,
+ newWindowStopIndex,
+ stopWindowIndex,
+ elementSplit,
+ watermarkAndState);
}
} else {
// We are on the last window then compute the element split with given fraction.
+ SplitResult<?> elementSplitResult = null;
newWindowStopIndex = stopWindowIndex;
- SplitResult<?> splitResult = currentTracker.trySplit(fractionOfRemainder);
- if (splitResult == null) {
+ if (currentTracker != null) {
+ elementSplitResult = currentTracker.trySplit(fractionOfRemainder);
+ } else {
+ downstreamSplitResult = splitDelegate.trySplit(fractionOfRemainder);
+ }
+ if (elementSplitResult == null && downstreamSplitResult == null) {
return null;
}
windowedSplitResult =
@@ -1546,50 +1433,32 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
currentWindowIndex,
stopWindowIndex,
stopWindowIndex,
- splitResult,
+ elementSplitResult,
watermarkAndState);
}
- return KV.of(windowedSplitResult, newWindowStopIndex);
+ return SplitResultsWithStopIndex.of(
+ windowedSplitResult, downstreamSplitResult, newWindowStopIndex);
}
- private HandlesSplits.SplitResult trySplitForElementAndRestriction(
- double fractionOfRemainder, Duration resumeDelay) {
- KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
- WindowedSplitResult windowedSplitResult = null;
- synchronized (splitLock) {
- // There is nothing to split if we are between element and restriction processing calls.
- if (currentTracker == null) {
- return null;
- }
- // Make sure to get the output watermark before we split to ensure that the lower bound
- // applies to the residual.
- watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
- KV<WindowedSplitResult, Integer> splitResult =
- trySplitForProcess(
- currentElement,
- currentRestriction,
- currentWindow,
- currentWindows,
- currentWatermarkEstimatorState,
- fractionOfRemainder,
- currentTracker,
- watermarkAndState,
- windowCurrentIndex,
- windowStopIndex);
- if (splitResult == null) {
- return null;
- }
- windowStopIndex = splitResult.getValue();
- windowedSplitResult =
- calculateRestrictionSize(
- splitResult.getKey(),
- PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
- + "/GetSize");
- }
-
+ @VisibleForTesting
+ static <WatermarkEstimatorStateT> HandlesSplits.SplitResult constructSplitResult(
+ WindowedSplitResult windowedSplitResult,
+ HandlesSplits.SplitResult downstreamElementSplit,
+ Coder fullInputCoder,
+ Instant initialWatermark,
+ KV<Instant, WatermarkEstimatorStateT> watermarkAndState,
+ String pTransformId,
+ String mainInputId,
+ Collection<String> outputIds,
+ Duration resumeDelay) {
+ // The element split cannot from both windowedSplitResult and downstreamElementSplit.
+ checkArgument(
+ (windowedSplitResult == null || windowedSplitResult.getResidualSplitRoot() == null)
+ || downstreamElementSplit == null);
List<BundleApplication> primaryRoots = new ArrayList<>();
List<DelayedBundleApplication> residualRoots = new ArrayList<>();
- Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+
+ // Encode window splits.
if (windowedSplitResult != null
&& windowedSplitResult.getPrimaryInFullyProcessedWindowsRoot() != null) {
ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
@@ -1630,7 +1499,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
.setSeconds(initialWatermark.getMillis() / 1000)
.setNanos((int) (initialWatermark.getMillis() % 1000) * 1000000)
.build();
- for (String outputId : pTransform.getOutputsMap().keySet()) {
+ for (String outputId : outputIds) {
outputWatermarkMapForUnprocessedWindows.put(outputId, outputWatermark);
}
}
@@ -1642,9 +1511,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
.build());
}
- if (windowedSplitResult.getResidualSplitRoot() != null) {
- ByteString.Output primaryBytes = ByteString.newOutput();
- ByteString.Output residualBytes = ByteString.newOutput();
+ ByteString.Output primaryBytes = ByteString.newOutput();
+ ByteString.Output residualBytes = ByteString.newOutput();
+ // Encode element split from windowedSplitResult or from downstream element split. It's possible
+ // that there is no element split.
+ if (windowedSplitResult != null && windowedSplitResult.getResidualSplitRoot() != null) {
+ // When there is element split in windowedSplitResult, the resumeDelay should not be null.
+ checkNotNull(resumeDelay);
try {
fullInputCoder.encode(windowedSplitResult.getPrimarySplitRoot(), primaryBytes);
fullInputCoder.encode(windowedSplitResult.getResidualSplitRoot(), residualBytes);
@@ -1670,7 +1543,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
.setSeconds(watermarkAndState.getKey().getMillis() / 1000)
.setNanos((int) (watermarkAndState.getKey().getMillis() % 1000) * 1000000)
.build();
- for (String outputId : pTransform.getOutputsMap().keySet()) {
+ for (String outputId : outputIds) {
outputWatermarkMap.put(outputId, outputWatermark);
}
}
@@ -1680,11 +1553,64 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
.setApplication(residualApplication)
.setRequestedTimeDelay(Durations.fromMillis(resumeDelay.getMillis()))
.build());
+
+ } else if (downstreamElementSplit != null) {
+ primaryRoots.add(Iterables.getOnlyElement(downstreamElementSplit.getPrimaryRoots()));
+ residualRoots.add(Iterables.getOnlyElement(downstreamElementSplit.getResidualRoots()));
}
return HandlesSplits.SplitResult.of(primaryRoots, residualRoots);
}
+ private HandlesSplits.SplitResult trySplitForElementAndRestriction(
+ double fractionOfRemainder, Duration resumeDelay) {
+ KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
+ WindowedSplitResult windowedSplitResult = null;
+ synchronized (splitLock) {
+ // There is nothing to split if we are between element and restriction processing calls.
+ if (currentTracker == null) {
+ return null;
+ }
+ // Make sure to get the output watermark before we split to ensure that the lower bound
+ // applies to the residual.
+ watermarkAndState = currentWatermarkEstimator.getWatermarkAndState();
+ SplitResultsWithStopIndex splitResult =
+ computeSplitForProcessOrTruncate(
+ currentElement,
+ currentRestriction,
+ currentWindow,
+ currentWindows,
+ currentWatermarkEstimatorState,
+ fractionOfRemainder,
+ currentTracker,
+ null,
+ watermarkAndState,
+ windowCurrentIndex,
+ windowStopIndex);
+ if (splitResult == null) {
+ return null;
+ }
+ windowStopIndex = splitResult.getNewWindowStopIndex();
+ // Populate the size of primary/residual.
+ windowedSplitResult =
+ calculateRestrictionSize(
+ splitResult.getWindowSplit(),
+ PTransformTranslation.SPLITTABLE_PROCESS_SIZED_ELEMENTS_AND_RESTRICTIONS_URN
+ + "/GetSize");
+ }
+ Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+ return constructSplitResult(
+ windowedSplitResult,
+ null,
+ fullInputCoder,
+ initialWatermark,
+ watermarkAndState,
+ pTransformId,
+ mainInputId,
+ pTransform.getOutputsMap().keySet(),
+ resumeDelay);
+ }
+
private <K> void processTimer(
String timerIdOrTimerFamilyId, TimeDomain timeDomain, Timer<K> timer) {
currentTimer = timer;
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index b388587..27a7a76 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -50,6 +50,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import org.apache.beam.fn.harness.FnApiDoFnRunner.SplitResultsWithStopIndex;
import org.apache.beam.fn.harness.FnApiDoFnRunner.WindowedSplitResult;
import org.apache.beam.fn.harness.HandlesSplits.SplitResult;
import org.apache.beam.fn.harness.PTransformRunnerFactory.ProgressRequestCallback;
@@ -82,8 +83,11 @@ import org.apache.beam.runners.core.metrics.SimpleMonitoringInfoBuilder;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.DoubleCoder;
+import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.IterableCoder;
+import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.fn.data.FnDataReceiver;
import org.apache.beam.sdk.fn.data.LogicalEndpoint;
import org.apache.beam.sdk.function.ThrowingRunnable;
@@ -134,6 +138,7 @@ import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.ByteString;
+import org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.util.Durations;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
@@ -145,6 +150,7 @@ import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
+import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
@@ -3768,14 +3774,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
@RunWith(JUnit4.class)
public static class SplitTest {
+ @Rule public final ExpectedException expected = ExpectedException.none();
private IntervalWindow window1;
private IntervalWindow window2;
private IntervalWindow window3;
private WindowedValue<String> currentElement;
private OffsetRange currentRestriction;
private Instant currentWatermarkEstimatorState;
+ private Instant initialWatermark;
KV<Instant, Instant> watermarkAndState;
+ private static final String PROCESS_TRANSFORM_ID = "processPTransformId";
+ private static final String TRUNCATE_TRANSFORM_ID = "truncatePTransformId";
+ private static final String PROCESS_INPUT_ID = "processInputId";
+ private static final String TRUNCATE_INPUT_ID = "truncateInputId";
+ private static final String PROCESS_OUTPUT_ID = "processOutputId";
+ private static final String TRUNCATE_OUTPUT_ID = "truncateOutputId";
+
private KV<WindowedValue, WindowedValue> createSplitInWindow(
OffsetRange primaryRestriction, OffsetRange residualRestriction, BoundedWindow window) {
return KV.of(
@@ -3818,6 +3833,56 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentElement.getPane()));
}
+ private KV<WindowedValue, WindowedValue> createSplitWithSizeInWindow(
+ OffsetRange primaryRestriction, OffsetRange residualRestriction, BoundedWindow window) {
+ return KV.of(
+ WindowedValue.of(
+ KV.of(
+ KV.of(
+ currentElement.getValue(),
+ KV.of(primaryRestriction, currentWatermarkEstimatorState)),
+ (double) (primaryRestriction.getTo() - primaryRestriction.getFrom())),
+ currentElement.getTimestamp(),
+ window,
+ currentElement.getPane()),
+ WindowedValue.of(
+ KV.of(
+ KV.of(
+ currentElement.getValue(),
+ KV.of(residualRestriction, watermarkAndState.getValue())),
+ (double) (residualRestriction.getTo() - residualRestriction.getFrom())),
+ currentElement.getTimestamp(),
+ window,
+ currentElement.getPane()));
+ }
+
+ private KV<WindowedValue, WindowedValue> createSplitWithSizeAcrossWindows(
+ List<BoundedWindow> primaryWindows, List<BoundedWindow> residualWindows) {
+ return KV.of(
+ primaryWindows.isEmpty()
+ ? null
+ : WindowedValue.of(
+ KV.of(
+ KV.of(
+ currentElement.getValue(),
+ KV.of(currentRestriction, currentWatermarkEstimatorState)),
+ (double) (currentRestriction.getTo() - currentRestriction.getFrom())),
+ currentElement.getTimestamp(),
+ primaryWindows,
+ currentElement.getPane()),
+ residualWindows.isEmpty()
+ ? null
+ : WindowedValue.of(
+ KV.of(
+ KV.of(
+ currentElement.getValue(),
+ KV.of(currentRestriction, currentWatermarkEstimatorState)),
+ (double) (currentRestriction.getTo() - currentRestriction.getFrom())),
+ currentElement.getTimestamp(),
+ residualWindows,
+ currentElement.getPane()));
+ }
+
@Before
public void setUp() {
window1 = new IntervalWindow(Instant.ofEpochMilli(0), Instant.ofEpochMilli(10));
@@ -3831,11 +3896,12 @@ public class FnApiDoFnRunnerTest implements Serializable {
PaneInfo.NO_FIRING);
currentRestriction = new OffsetRange(0L, 100L);
currentWatermarkEstimatorState = Instant.ofEpochMilli(21);
+ initialWatermark = Instant.ofEpochMilli(25);
watermarkAndState = KV.of(Instant.ofEpochMilli(42), Instant.ofEpochMilli(42));
}
@Test
- public void testScaleProgress() throws Exception {
+ public void testScaledProgress() throws Exception {
Progress elementProgress = Progress.from(2, 8);
// There is only one window.
Progress scaledResult = FnApiDoFnRunner.scaleProgress(elementProgress, 0, 1);
@@ -3859,12 +3925,66 @@ public class FnApiDoFnRunnerTest implements Serializable {
}
@Test
+ public void testComputeSplitForProcessOrTruncateWithNullTrackerAndSplitDelegate()
+ throws Exception {
+ expected.expect(IllegalArgumentException.class);
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+ currentElement,
+ currentRestriction,
+ window1,
+ ImmutableList.copyOf(currentElement.getWindows()),
+ currentWatermarkEstimatorState,
+ 0.0,
+ null,
+ null,
+ null,
+ 0,
+ 3);
+ }
+
+ @Test
+ public void testComputeSplitForProcessOrTruncateWithNotNullTrackerAndDelegate()
+ throws Exception {
+ expected.expect(IllegalArgumentException.class);
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+ currentElement,
+ currentRestriction,
+ window1,
+ ImmutableList.copyOf(currentElement.getWindows()),
+ currentWatermarkEstimatorState,
+ 0.0,
+ new OffsetRangeTracker(currentRestriction),
+ createSplitDelegate(0.3, 0.0, null),
+ null,
+ 0,
+ 3);
+ }
+
+ @Test
+ public void testComputeSplitForProcessOrTruncateWithInvalidWatermarkAndState()
+ throws Exception {
+ expected.expect(NullPointerException.class);
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
+ currentElement,
+ currentRestriction,
+ window1,
+ ImmutableList.copyOf(currentElement.getWindows()),
+ currentWatermarkEstimatorState,
+ 0.0,
+ new OffsetRangeTracker(currentRestriction),
+ null,
+ null,
+ 0,
+ 3);
+ }
+
+ @Test
public void testTrySplitForProcessCheckpointOnFirstWindow() throws Exception {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(30L);
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.<Instant>computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
@@ -3872,20 +3992,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0.0,
tracker,
+ null,
watermarkAndState,
0,
3);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedElementSplit =
createSplitInWindow(new OffsetRange(0, 31), new OffsetRange(31, 100), window1);
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2, window3));
- assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
- assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+ assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+ assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -3893,8 +4016,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(30L);
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.<Instant>computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
@@ -3902,20 +4025,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0.0,
tracker,
+ null,
watermarkAndState,
0,
2);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedElementSplit =
createSplitInWindow(new OffsetRange(0, 31), new OffsetRange(31, 100), window1);
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2));
- assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
- assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+ assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+ assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -3923,8 +4049,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(30L);
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.<Instant>computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
@@ -3932,20 +4058,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0.2,
tracker,
+ null,
watermarkAndState,
0,
3);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedElementSplit =
createSplitInWindow(new OffsetRange(0, 84), new OffsetRange(84, 100), window1);
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2, window3));
- assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
- assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+ assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+ assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -3953,8 +4082,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(30L);
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window2,
@@ -3962,22 +4091,25 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0.2,
tracker,
+ null,
watermarkAndState,
1,
3);
- assertEquals(2, (int) result.getValue());
+ assertEquals(2, result.getNewWindowStopIndex());
// Java uses BigDecimal so 0.2 * 170 = 63.9...
// BigDecimal.longValue() will round down to 63 instead of the expected 64
KV<WindowedValue, WindowedValue> expectedElementSplit =
createSplitInWindow(new OffsetRange(0, 63), new OffsetRange(63, 100), window2);
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window3));
- assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
- assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+ assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+ assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -3985,8 +4117,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(30L);
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window3,
@@ -3994,20 +4126,23 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0.2,
tracker,
+ null,
watermarkAndState,
2,
3);
- assertEquals(3, (int) result.getValue());
+ assertEquals(3, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedElementSplit =
createSplitInWindow(new OffsetRange(0, 44), new OffsetRange(44, 100), window3);
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of());
- assertEquals(expectedElementSplit.getKey(), result.getKey().getPrimarySplitRoot());
- assertEquals(expectedElementSplit.getValue(), result.getKey().getResidualSplitRoot());
+ assertEquals(expectedElementSplit.getKey(), result.getWindowSplit().getPrimarySplitRoot());
+ assertEquals(expectedElementSplit.getValue(), result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4016,8 +4151,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(100L);
assertNull(tracker.trySplit(0.0));
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window3,
@@ -4025,18 +4160,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0,
tracker,
+ null,
watermarkAndState,
0,
3);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window2, window3));
- assertNull(result.getKey().getPrimarySplitRoot());
- assertNull(result.getKey().getResidualSplitRoot());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4045,8 +4183,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(100L);
assertNull(tracker.trySplit(0.0));
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window3,
@@ -4054,6 +4192,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0,
tracker,
+ null,
watermarkAndState,
2,
3);
@@ -4065,8 +4204,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(30L);
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window2,
@@ -4074,18 +4213,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0.6,
tracker,
+ null,
watermarkAndState,
0,
3);
- assertEquals(2, (int) result.getValue());
+ assertEquals(2, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of(window3));
- assertNull(result.getKey().getPrimarySplitRoot());
- assertNull(result.getKey().getResidualSplitRoot());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4093,8 +4235,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(30L);
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window2,
@@ -4102,18 +4244,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0.3,
tracker,
+ null,
watermarkAndState,
0,
3);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window2, window3));
- assertNull(result.getKey().getPrimarySplitRoot());
- assertNull(result.getKey().getResidualSplitRoot());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4121,8 +4266,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
OffsetRangeTracker tracker = new OffsetRangeTracker(currentRestriction);
tracker.tryClaim(30L);
- KV<WindowedSplitResult, Integer> result =
- FnApiDoFnRunner.trySplitForProcess(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window2,
@@ -4130,18 +4275,21 @@ public class FnApiDoFnRunnerTest implements Serializable {
currentWatermarkEstimatorState,
0.9,
tracker,
+ null,
watermarkAndState,
0,
3);
- assertEquals(2, (int) result.getValue());
+ assertEquals(2, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of(window3));
- assertNull(result.getKey().getPrimarySplitRoot());
- assertNull(result.getKey().getResidualSplitRoot());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
- expectedWindowSplit.getKey(), result.getKey().getPrimaryInFullyProcessedWindowsRoot());
+ expectedWindowSplit.getKey(),
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
- expectedWindowSplit.getValue(), result.getKey().getResidualInUnprocessedWindowsRoot());
+ expectedWindowSplit.getValue(),
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
private HandlesSplits createSplitDelegate(
@@ -4168,27 +4316,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, splitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.0,
+ null,
splitDelegate,
+ null,
0,
3);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2, window3));
- assertEquals(splitResult, result.getKey().getValue());
+ assertEquals(splitResult, result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4199,27 +4351,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, splitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.0,
+ null,
splitDelegate,
+ null,
0,
2);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2));
- assertEquals(splitResult, result.getKey().getValue());
+ assertEquals(splitResult, result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4230,27 +4386,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.54, splitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.2,
+ null,
splitDelegate,
+ null,
0,
3);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(), ImmutableList.of(window2, window3));
- assertEquals(splitResult, result.getKey().getValue());
+ assertEquals(splitResult, result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4261,27 +4421,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.34, splitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.2,
+ null,
splitDelegate,
+ null,
1,
3);
- assertEquals(2, (int) result.getValue());
+ assertEquals(2, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window3));
- assertEquals(splitResult, result.getKey().getValue());
+ assertEquals(splitResult, result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4292,27 +4456,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.2, splitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.2,
+ null,
splitDelegate,
+ null,
2,
3);
- assertEquals(3, (int) result.getValue());
+ assertEquals(3, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of());
- assertEquals(splitResult, result.getKey().getValue());
+ assertEquals(splitResult, result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4323,42 +4491,48 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(1.0, 0.0, unusedSplitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.0,
+ null,
splitDelegate,
+ null,
0,
3);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window2, window3));
- assertNull(result.getKey().getValue());
+ assertNull(result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
public void testTrySplitForTruncateSplitOnLastWindowWhenNoElementSplit() throws Exception {
List<BoundedWindow> windows = ImmutableList.copyOf(currentElement.getWindows());
HandlesSplits splitDelegate = createSplitDelegate(1.0, 0.0, null);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.0,
+ null,
splitDelegate,
+ null,
2,
3);
assertNull(result);
@@ -4372,27 +4546,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, unusedSplitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.6,
+ null,
splitDelegate,
+ null,
0,
3);
- assertEquals(2, (int) result.getValue());
+ assertEquals(2, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of(window3));
- assertNull(result.getKey().getValue());
+ assertNull(result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4403,27 +4581,31 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, unusedSplitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.3,
+ null,
splitDelegate,
+ null,
0,
3);
- assertEquals(1, (int) result.getValue());
+ assertEquals(1, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window2, window3));
- assertNull(result.getKey().getValue());
+ assertNull(result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
}
@Test
@@ -4434,27 +4616,313 @@ public class FnApiDoFnRunnerTest implements Serializable {
ImmutableList.of(BundleApplication.getDefaultInstance()),
ImmutableList.of(DelayedBundleApplication.getDefaultInstance()));
HandlesSplits splitDelegate = createSplitDelegate(0.3, 0.0, unusedSplitResult);
- KV<KV<WindowedSplitResult, SplitResult>, Integer> result =
- FnApiDoFnRunner.trySplitForTruncate(
+ SplitResultsWithStopIndex result =
+ FnApiDoFnRunner.computeSplitForProcessOrTruncate(
currentElement,
currentRestriction,
window1,
windows,
currentWatermarkEstimatorState,
0.6,
+ null,
splitDelegate,
+ null,
0,
3);
- assertEquals(2, (int) result.getValue());
+ assertEquals(2, result.getNewWindowStopIndex());
KV<WindowedValue, WindowedValue> expectedWindowSplit =
createSplitAcrossWindows(ImmutableList.of(window1, window2), ImmutableList.of(window3));
- assertNull(result.getKey().getValue());
+ assertNull(result.getDownstreamSplit());
+ assertNull(result.getWindowSplit().getPrimarySplitRoot());
+ assertNull(result.getWindowSplit().getResidualSplitRoot());
assertEquals(
expectedWindowSplit.getKey(),
- result.getKey().getKey().getPrimaryInFullyProcessedWindowsRoot());
+ result.getWindowSplit().getPrimaryInFullyProcessedWindowsRoot());
assertEquals(
expectedWindowSplit.getValue(),
- result.getKey().getKey().getResidualInUnprocessedWindowsRoot());
+ result.getWindowSplit().getResidualInUnprocessedWindowsRoot());
+ }
+
+ @Test
+ public void testConstructSplitResultWithInvalidElementSplits() throws Exception {
+ expected.expect(IllegalArgumentException.class);
+ FnApiDoFnRunner.constructSplitResult(
+ WindowedSplitResult.forRoots(
+ null,
+ WindowedValue.valueInGlobalWindow("elementPrimary"),
+ WindowedValue.valueInGlobalWindow("elementResidual"),
+ null),
+ HandlesSplits.SplitResult.of(
+ ImmutableList.of(BundleApplication.getDefaultInstance()),
+ ImmutableList.of(DelayedBundleApplication.getDefaultInstance())),
+ WindowedValue.getFullCoder(VoidCoder.of(), GlobalWindow.Coder.INSTANCE),
+ Instant.now(),
+ null,
+ "ptransformId",
+ "inputId",
+ ImmutableList.of("outputId"),
+ null);
+ }
+
+ private Coder getFullInputCoder(
+ Coder elementCoder, Coder restrictionCoder, Coder watermarkStateCoder, Coder windowCoder) {
+ Coder inputCoder =
+ KvCoder.of(
+ KvCoder.of(elementCoder, KvCoder.of(restrictionCoder, watermarkStateCoder)),
+ DoubleCoder.of());
+ return WindowedValue.getFullCoder(inputCoder, windowCoder);
+ }
+
+ private HandlesSplits.SplitResult getProcessElementSplit(String transformId, String inputId) {
+ return SplitResult.of(
+ ImmutableList.of(
+ BundleApplication.newBuilder()
+ .setTransformId(transformId)
+ .setInputId(inputId)
+ .build()),
+ ImmutableList.of(
+ DelayedBundleApplication.newBuilder()
+ .setApplication(
+ BundleApplication.newBuilder()
+ .setTransformId(transformId)
+ .setInputId(inputId)
+ .build())
+ .setRequestedTimeDelay(Durations.fromMillis(1000L))
+ .build()));
+ }
+
+ private org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp toTimestamp(
+ Instant time) {
+ return org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Timestamp.newBuilder()
+ .setSeconds(time.getMillis() / 1000)
+ .setNanos((int) (time.getMillis() % 1000) * 1000000)
+ .build();
+ }
+
+ @Test
+ public void testConstructSplitResultWithElementSplitFromDelegate() throws Exception {
+ Coder fullInputCoder =
+ getFullInputCoder(
+ StringUtf8Coder.of(),
+ OffsetRange.Coder.of(),
+ InstantCoder.of(),
+ IntervalWindow.getCoder());
+ HandlesSplits.SplitResult elementSplit =
+ getProcessElementSplit(PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID);
+ HandlesSplits.SplitResult result =
+ FnApiDoFnRunner.constructSplitResult(
+ null,
+ elementSplit,
+ fullInputCoder,
+ null,
+ null,
+ TRUNCATE_TRANSFORM_ID,
+ TRUNCATE_INPUT_ID,
+ ImmutableList.of(TRUNCATE_OUTPUT_ID),
+ null);
+ assertEquals(elementSplit.getPrimaryRoots(), result.getPrimaryRoots());
+ assertEquals(elementSplit.getResidualRoots(), result.getResidualRoots());
+ }
+
+ @Test
+ public void testConstructSplitResultWithElementSplitFromTracker() throws Exception {
+ Coder fullInputCoder =
+ getFullInputCoder(
+ StringUtf8Coder.of(),
+ OffsetRange.Coder.of(),
+ InstantCoder.of(),
+ IntervalWindow.getCoder());
+ KV<WindowedValue, WindowedValue> elementSplit =
+ createSplitWithSizeInWindow(new OffsetRange(0, 31), new OffsetRange(31, 100), window1);
+ HandlesSplits.SplitResult result =
+ FnApiDoFnRunner.constructSplitResult(
+ WindowedSplitResult.forRoots(
+ null, elementSplit.getKey(), elementSplit.getValue(), null),
+ null,
+ fullInputCoder,
+ null,
+ watermarkAndState,
+ PROCESS_TRANSFORM_ID,
+ PROCESS_INPUT_ID,
+ ImmutableList.of(PROCESS_OUTPUT_ID),
+ Duration.millis(100L));
+ assertEquals(1, result.getPrimaryRoots().size());
+ BundleApplication primaryRoot = result.getPrimaryRoots().get(0);
+ assertEquals(PROCESS_TRANSFORM_ID, primaryRoot.getTransformId());
+ assertEquals(PROCESS_INPUT_ID, primaryRoot.getInputId());
+ assertEquals(
+ elementSplit.getKey(), fullInputCoder.decode(primaryRoot.getElement().newInput()));
+
+ assertEquals(1, result.getResidualRoots().size());
+ DelayedBundleApplication residualRoot = result.getResidualRoots().get(0);
+ assertEquals(Durations.fromMillis(100L), residualRoot.getRequestedTimeDelay());
+ assertEquals(PROCESS_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
+ assertEquals(PROCESS_INPUT_ID, residualRoot.getApplication().getInputId());
+ assertEquals(
+ toTimestamp(watermarkAndState.getValue()),
+ residualRoot.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
+ assertEquals(
+ elementSplit.getValue(),
+ fullInputCoder.decode(residualRoot.getApplication().getElement().newInput()));
+ }
+
+ @Test
+ public void testConstructSplitResultWithOnlyWindowSplits() throws Exception {
+ Coder fullInputCoder =
+ getFullInputCoder(
+ StringUtf8Coder.of(),
+ OffsetRange.Coder.of(),
+ InstantCoder.of(),
+ IntervalWindow.getCoder());
+ KV<WindowedValue, WindowedValue> windowSplit =
+ createSplitWithSizeAcrossWindows(
+ ImmutableList.of(window1), ImmutableList.of(window2, window3));
+ HandlesSplits.SplitResult result =
+ FnApiDoFnRunner.constructSplitResult(
+ WindowedSplitResult.forRoots(
+ windowSplit.getKey(), null, null, windowSplit.getValue()),
+ null,
+ fullInputCoder,
+ initialWatermark,
+ watermarkAndState,
+ PROCESS_TRANSFORM_ID,
+ PROCESS_INPUT_ID,
+ ImmutableList.of(PROCESS_OUTPUT_ID),
+ Duration.millis(100L));
+ assertEquals(1, result.getPrimaryRoots().size());
+ BundleApplication primaryRoot = result.getPrimaryRoots().get(0);
+ assertEquals(PROCESS_TRANSFORM_ID, primaryRoot.getTransformId());
+ assertEquals(PROCESS_INPUT_ID, primaryRoot.getInputId());
+ assertEquals(
+ windowSplit.getKey(), fullInputCoder.decode(primaryRoot.getElement().newInput()));
+
+ assertEquals(1, result.getResidualRoots().size());
+ DelayedBundleApplication residualRoot = result.getResidualRoots().get(0);
+ assertEquals(
+ org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Duration.getDefaultInstance(),
+ residualRoot.getRequestedTimeDelay());
+ assertEquals(PROCESS_TRANSFORM_ID, residualRoot.getApplication().getTransformId());
+ assertEquals(PROCESS_INPUT_ID, residualRoot.getApplication().getInputId());
+ assertEquals(
+ toTimestamp(initialWatermark),
+ residualRoot.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
+ assertEquals(
+ windowSplit.getValue(),
+ fullInputCoder.decode(residualRoot.getApplication().getElement().newInput()));
+ }
+
+ @Test
+ public void testConstructSplitResultWithElementAndWindowSplitFromProcess() throws Exception {
+ Coder fullInputCoder =
+ getFullInputCoder(
+ StringUtf8Coder.of(),
+ OffsetRange.Coder.of(),
+ InstantCoder.of(),
+ IntervalWindow.getCoder());
+ KV<WindowedValue, WindowedValue> windowSplit =
+ createSplitWithSizeAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window3));
+ KV<WindowedValue, WindowedValue> elementSplit =
+ createSplitWithSizeInWindow(new OffsetRange(0, 31), new OffsetRange(31, 100), window2);
+ HandlesSplits.SplitResult result =
+ FnApiDoFnRunner.constructSplitResult(
+ WindowedSplitResult.forRoots(
+ windowSplit.getKey(),
+ elementSplit.getKey(),
+ elementSplit.getValue(),
+ windowSplit.getValue()),
+ null,
+ fullInputCoder,
+ initialWatermark,
+ watermarkAndState,
+ PROCESS_TRANSFORM_ID,
+ PROCESS_INPUT_ID,
+ ImmutableList.of(PROCESS_OUTPUT_ID),
+ Duration.millis(100L));
+ assertEquals(2, result.getPrimaryRoots().size());
+ BundleApplication windowPrimary = result.getPrimaryRoots().get(0);
+ BundleApplication elementPrimary = result.getPrimaryRoots().get(1);
+ assertEquals(PROCESS_TRANSFORM_ID, windowPrimary.getTransformId());
+ assertEquals(PROCESS_INPUT_ID, windowPrimary.getInputId());
+ assertEquals(
+ windowSplit.getKey(), fullInputCoder.decode(windowPrimary.getElement().newInput()));
+ assertEquals(PROCESS_TRANSFORM_ID, elementPrimary.getTransformId());
+ assertEquals(PROCESS_INPUT_ID, elementPrimary.getInputId());
+ assertEquals(
+ elementSplit.getKey(), fullInputCoder.decode(elementPrimary.getElement().newInput()));
+
+ assertEquals(2, result.getResidualRoots().size());
+ DelayedBundleApplication windowResidual = result.getResidualRoots().get(0);
+ DelayedBundleApplication elementResidual = result.getResidualRoots().get(1);
+ assertEquals(
+ org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Duration.getDefaultInstance(),
+ windowResidual.getRequestedTimeDelay());
+ assertEquals(PROCESS_TRANSFORM_ID, windowResidual.getApplication().getTransformId());
+ assertEquals(PROCESS_INPUT_ID, windowResidual.getApplication().getInputId());
+ assertEquals(
+ toTimestamp(initialWatermark),
+ windowResidual.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
+ assertEquals(
+ windowSplit.getValue(),
+ fullInputCoder.decode(windowResidual.getApplication().getElement().newInput()));
+ assertEquals(Durations.fromMillis(100L), elementResidual.getRequestedTimeDelay());
+ assertEquals(PROCESS_TRANSFORM_ID, elementResidual.getApplication().getTransformId());
+ assertEquals(PROCESS_INPUT_ID, elementResidual.getApplication().getInputId());
+ assertEquals(
+ toTimestamp(watermarkAndState.getValue()),
+ elementResidual.getApplication().getOutputWatermarksMap().get(PROCESS_OUTPUT_ID));
+ assertEquals(
+ elementSplit.getValue(),
+ fullInputCoder.decode(elementResidual.getApplication().getElement().newInput()));
+ }
+
+ @Test
+ public void testConstructSplitResultWithElementAndWindowSplitFromTruncate() throws Exception {
+ Coder fullInputCoder =
+ getFullInputCoder(
+ StringUtf8Coder.of(),
+ OffsetRange.Coder.of(),
+ InstantCoder.of(),
+ IntervalWindow.getCoder());
+ KV<WindowedValue, WindowedValue> windowSplit =
+ createSplitWithSizeAcrossWindows(ImmutableList.of(window1), ImmutableList.of(window3));
+ HandlesSplits.SplitResult elementSplit =
+ getProcessElementSplit(PROCESS_TRANSFORM_ID, PROCESS_INPUT_ID);
+ HandlesSplits.SplitResult result =
+ FnApiDoFnRunner.constructSplitResult(
+ WindowedSplitResult.forRoots(
+ windowSplit.getKey(), null, null, windowSplit.getValue()),
+ elementSplit,
+ fullInputCoder,
+ initialWatermark,
+ watermarkAndState,
+ TRUNCATE_TRANSFORM_ID,
+ TRUNCATE_INPUT_ID,
+ ImmutableList.of(TRUNCATE_OUTPUT_ID),
+ Duration.millis(100L));
+ assertEquals(2, result.getPrimaryRoots().size());
+ BundleApplication windowPrimary = result.getPrimaryRoots().get(0);
+ BundleApplication elementPrimary = result.getPrimaryRoots().get(1);
+ assertEquals(TRUNCATE_TRANSFORM_ID, windowPrimary.getTransformId());
+ assertEquals(TRUNCATE_INPUT_ID, windowPrimary.getInputId());
+ assertEquals(
+ windowSplit.getKey(), fullInputCoder.decode(windowPrimary.getElement().newInput()));
+ assertEquals(elementSplit.getPrimaryRoots().get(0), elementPrimary);
+
+ assertEquals(2, result.getResidualRoots().size());
+ DelayedBundleApplication windowResidual = result.getResidualRoots().get(0);
+ DelayedBundleApplication elementResidual = result.getResidualRoots().get(1);
+ assertEquals(
+ org.apache.beam.vendor.grpc.v1p26p0.com.google.protobuf.Duration.getDefaultInstance(),
+ windowResidual.getRequestedTimeDelay());
+ assertEquals(TRUNCATE_TRANSFORM_ID, windowResidual.getApplication().getTransformId());
+ assertEquals(TRUNCATE_INPUT_ID, windowResidual.getApplication().getInputId());
+ assertEquals(
+ toTimestamp(initialWatermark),
+ windowResidual.getApplication().getOutputWatermarksMap().get(TRUNCATE_OUTPUT_ID));
+ assertEquals(
+ windowSplit.getValue(),
+ fullInputCoder.decode(windowResidual.getApplication().getElement().newInput()));
+ assertEquals(elementSplit.getResidualRoots().get(0), elementResidual);
}
}
}