You are viewing a plain text version of this content. The canonical link for it is here.
Posted to github@beam.apache.org by GitBox <gi...@apache.org> on 2020/08/03 21:30:26 UTC

[GitHub] [beam] lukecwik commented on a change in pull request #12419: [BEAM-10303] Handle split when truncate observes windows.

lukecwik commented on a change in pull request #12419:
URL: https://github.com/apache/beam/pull/12419#discussion_r464651698



##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -515,15 +515,18 @@
               && Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
+                  private final HandlesSplits splitDelegate =
+                      (HandlesSplits) Iterables.get(mainOutputConsumers, 0);

Review comment:
       nit: here and below on 550
   ```suggestion
                         (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedResidualRoot.getApplication().getTransformId())

Review comment:
       ditto, we should be using the `truncate` transfrom id and truncate `input` id.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())

Review comment:
       we should be using the `truncate` transform id and not the `process sized elements` transform id
   
   This will remove the assumption about what the input coder is and we can append our additional primaries and residuals on top of any additional residuals/primaries added by the downstream split.
   
   Doing this might require fixing Dataflow runner v2.

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {
+    WindowedValue primaryInFullyProcessedWindowsRoot;
+    WindowedValue residualInUnprocessedWindowsRoot;
+    // Note that the assumption here is the fullInputCoder of the Truncate transform should be the
+    // the same as the SDF/Process transform.
+    Coder fullInputCoder = WindowedValue.getFullCoder(inputCoder, windowCoder);
+    BundleApplication windowedPrimaryRoot;
+    DelayedBundleApplication windowedResidualRoot;
+    synchronized (splitLock) {
+      // There is nothing to split if we are between truncate processing calls.
+      if (currentWindowIterator == null) {
+        return null;
+      }
+      HandlesSplits.SplitResult splitResult = splitDelegate.trySplit(fractionOfRemainder);
+      if (splitResult == null) {
+        return null;
+      }
+
+      windowedPrimaryRoot = Iterables.getOnlyElement(splitResult.getPrimaryRoots());
+      windowedResidualRoot = Iterables.getOnlyElement(splitResult.getResidualRoots());
+
+      // We have a successful split from downstream sdf process.
+      // Convert the split taking into account the processed windows, the current window and the
+      // yet to be processed windows.
+      List<BoundedWindow> primaryFullyProcessedWindows =
+          ImmutableList.copyOf(
+              Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      // Advances the iterator consuming the remaining windows.
+      List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
+      // If the window has been observed then the splitAndSize method would have already
+      // output sizes for each window separately.
+      //
+      // TODO: Consider using the original size on the element instead of recomputing
+      // this here.
+      double fullSize =
+          primaryFullyProcessedWindows.isEmpty() && residualUnprocessedWindows.isEmpty()
+              ? 0
+              : doFnInvoker.invokeGetSize(
+                  new DelegatingArgumentProvider<InputT, OutputT>(
+                      processContext,
+                      PTransformTranslation.SPLITTABLE_TRUNCATE_SIZED_RESTRICTION_URN
+                          + "/GetPrimarySize") {
+                    @Override
+                    public Object restriction() {
+                      return currentRestriction;
+                    }
+
+                    @Override
+                    public RestrictionTracker<?, ?> restrictionTracker() {
+                      return doFnInvoker.invokeNewTracker(this);
+                    }
+                  });
+      primaryInFullyProcessedWindowsRoot =
+          primaryFullyProcessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  primaryFullyProcessedWindows,
+                  currentElement.getPane());
+      residualInUnprocessedWindowsRoot =
+          residualUnprocessedWindows.isEmpty()
+              ? null
+              : WindowedValue.of(
+                  KV.of(
+                      KV.of(
+                          currentElement.getValue(),
+                          KV.of(currentRestriction, currentWatermarkEstimatorState)),
+                      fullSize),
+                  currentElement.getTimestamp(),
+                  residualUnprocessedWindows,
+                  currentElement.getPane());
+    }
+
+    List<BundleApplication> primaryRoots = new ArrayList<>();
+    List<DelayedBundleApplication> residualRoots = new ArrayList<>();
+
+    if (primaryInFullyProcessedWindowsRoot != null) {
+      ByteString.Output primaryInOtherWindowsBytes = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(primaryInFullyProcessedWindowsRoot, primaryInOtherWindowsBytes);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder primaryApplicationInOtherWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedPrimaryRoot.getTransformId())
+              .setInputId(windowedPrimaryRoot.getInputId())
+              .setElement(primaryInOtherWindowsBytes.toByteString());
+      primaryRoots.add(primaryApplicationInOtherWindows.build());
+    }
+    if (residualInUnprocessedWindowsRoot != null) {
+      ByteString.Output residualInUnprocessedWindowsBytesOut = ByteString.newOutput();
+      try {
+        fullInputCoder.encode(
+            residualInUnprocessedWindowsRoot, residualInUnprocessedWindowsBytesOut);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+      BundleApplication.Builder residualApplicationInUnprocessedWindows =
+          BundleApplication.newBuilder()
+              .setTransformId(windowedResidualRoot.getApplication().getTransformId())
+              .setInputId(windowedResidualRoot.getApplication().getInputId())
+              .setElement(residualInUnprocessedWindowsBytesOut.toByteString());
+      // We don't want to change the output watermarks or set the checkpoint resume time since
+      // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       We should always be using the initial watermark state for the unprocessed windows and not reporting any watermark for these residual roots since it should be the same as it was.
   ```suggestion
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1193,6 +1328,7 @@ public Object restriction() {
               .setElement(bytesOut.toByteString());
       // We don't want to change the output watermarks or set the checkpoint resume time since
       // that applies to the current window.
+      // TODO: Consider using currentWatermark in unprocessed window?

Review comment:
       Same as above.
   ```suggestion
   ```

##########
File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
##########
@@ -1035,6 +1046,130 @@ private Progress getProgress() {
     return null;
   }
 
+  private HandlesSplits.SplitResult trySplitForWindowObservingTruncateRestriction(
+      double fractionOfRemainder, HandlesSplits splitDelegate) {

Review comment:
       We should be scaling the fraction of the remainder relative to the number of windows we have to find the best split point.
   
   This logic should essentially mirror https://github.com/apache/beam/blob/13c77a8f7a7c726aaa6dadad7812acdf8772ba7c/sdks/python/apache_beam/runners/common.py#L892 except that we will be using the `truncate` transform id, input id, and coder.
   
   I suggest following the approach there where we have a static method that does all the heavy lifting so we can test it well and we have some simple wrappers which pass forward all the necessary arguments to the static method that is visible for testing.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -1452,11 +1452,22 @@ public Instant getInitialWatermarkEstimatorState() {
   static class WindowObservingTestSplittableDoFn extends NonWindowObservingTestSplittableDoFn {
 
     private final PCollectionView<String> singletonSideInput;
+    private static final long PROCESSED_WINDOW = 1;
+    private boolean splitAtTruncate = false;
+    private long processedWindowCount = 0;

Review comment:
       ```suggestion
       private final boolean splitAtTruncate;
       private long processedWindowCount;
   ```

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -1452,11 +1452,22 @@ public Instant getInitialWatermarkEstimatorState() {
   static class WindowObservingTestSplittableDoFn extends NonWindowObservingTestSplittableDoFn {
 
     private final PCollectionView<String> singletonSideInput;
+    private static final long PROCESSED_WINDOW = 1;
+    private boolean splitAtTruncate = false;
+    private long processedWindowCount = 0;
 
     private WindowObservingTestSplittableDoFn(PCollectionView<String> singletonSideInput) {
       this.singletonSideInput = singletonSideInput;
     }
 
+    private static WindowObservingTestSplittableDoFn forSplitAtTruncate(
+        PCollectionView<String> singletonSideInput) {
+      WindowObservingTestSplittableDoFn doFn =
+          new WindowObservingTestSplittableDoFn(singletonSideInput);

Review comment:
       nit: update the constructor to take this parameter instead of mutating it on the instance.

##########
File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
##########
@@ -3013,19 +3069,107 @@ public void testProcessElementForTruncateAndSizeRestrictionForwardSplitWhenObser
             startFunctionRegistry,
             finishFunctionRegistry,
             teardownFunctions::add,
-            null /* addProgressRequestCallback */,
-            null /* bundleSplitListener */,
+            progressRequestCallbacks::add /* addProgressRequestCallback */,
+            splitListener /* bundleSplitListener */,
             null /* bundleFinalizer */);
+
+    assertThat(consumers.keySet(), containsInAnyOrder(inputPCollectionId, outputPCollectionId));
     FnDataReceiver<WindowedValue<?>> mainInput =
         consumers.getMultiplexingConsumer(inputPCollectionId);
     assertThat(mainInput, instanceOf(HandlesSplits.class));
 
-    assertEquals(0, ((HandlesSplits) mainInput).getProgress(), 0.0);
-    assertNull(((HandlesSplits) mainInput).trySplit(0.4));
+    mainOutputValues.clear();
+    BoundedWindow window1 = new IntervalWindow(new Instant(5), new Instant(10));
+    BoundedWindow window2 = new IntervalWindow(new Instant(6), new Instant(11));
+    BoundedWindow window3 = new IntervalWindow(new Instant(7), new Instant(12));
+    // Setup and launch the trySplit thread.
+    ExecutorService executorService = Executors.newSingleThreadExecutor();
+    Future<HandlesSplits.SplitResult> trySplitFuture =
+        executorService.submit(
+            () -> {
+              try {
+                doFn.waitForSplitElementToBeProcessed();
+
+                return ((HandlesSplits) mainInput).trySplit(0);
+              } finally {
+                doFn.releaseWaitingProcessElementThread();
+              }
+            });
+
+    WindowedValue<?> splitValue =
+        valueInWindows(
+            KV.of(KV.of("7", KV.of(new OffsetRange(0, 6), GlobalWindow.TIMESTAMP_MIN_VALUE)), 6.0),
+            window1,
+            window2,
+            window3);
+    mainInput.accept(splitValue);
+    HandlesSplits.SplitResult trySplitResult = trySplitFuture.get();
+
+    // We expect that there are outputs from window1 and window2
+    assertThat(
+        mainOutputValues,
+        contains(
+            WindowedValue.of(
+                KV.of(
+                    KV.of("7", KV.of(new OffsetRange(0, 3), GlobalWindow.TIMESTAMP_MIN_VALUE)),
+                    3.0),
+                splitValue.getTimestamp(),
+                window1,
+                splitValue.getPane()),
+            WindowedValue.of(
+                KV.of(
+                    KV.of("7", KV.of(new OffsetRange(0, 3), GlobalWindow.TIMESTAMP_MIN_VALUE)),
+                    3.0),
+                splitValue.getTimestamp(),
+                window2,
+                splitValue.getPane())));
+
+    SplitResult expectedElementSplit = createSplitResult(0);

Review comment:
       We'll want to cover more scenarios then just this, see the ones I added in https://github.com/apache/beam/commit/ac6d80ed0cd31ec2b5233d6b3805ffb3ad71a0f1#diff-1b0bb2e59974e0a12a61a762a9add92a
   
   This would likely require refactoring the trySplit method to be a static method where all the parameters are passed to the method so that we don't have to worry about all the additional setup and we'll just be testing the core of the method. We can keep one variant around using this approach which well help ensure that we are using the correct locks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org