You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/08/15 04:36:04 UTC

[beam] branch master updated: Scale progress with respect to windows observation.

This is an automated email from the ASF dual-hosted git repository.

boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new a76edaa  Scale progress with respect to windows observation.
     new 314caa8  Merge pull request #12430 from boyuanzz/scale_progress
a76edaa is described below

commit a76edaa9582ac433a5f28c3b409d139134f5a8e1
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Jul 30 16:27:00 2020 -0700

    Scale progress with respect to windows observation.
---
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 55 ++++++++++++++++++++--
 .../beam/fn/harness/FnApiDoFnRunnerTest.java       |  6 +--
 2 files changed, 53 insertions(+), 8 deletions(-)

diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 702bcf4..b5ffe5a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -251,6 +251,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   private ListIterator<BoundedWindow> currentWindowIterator;
 
   /**
+   * Only valud during {@link
+   * #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
+   * #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
+   */
+  private int windowStopIndex;
+
+  /**
    * Only valid during {@link #processElementForPairWithRestriction}, {@link
    * #processElementForSplitRestriction}, and {@link #processElementForSizedElementAndRestriction},
    * null otherwise.
@@ -512,9 +519,12 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
             || !sideInputMapping.isEmpty()) {
           // Only forward split/progress when the only consumer is splittable.
           if (mainOutputConsumers.size() == 1
-              && Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {
+              && Iterables.getOnlyElement(mainOutputConsumers) instanceof HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
+                  private final HandlesSplits splitDelegate =
+                      (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
+
                   @Override
                   public void accept(WindowedValue input) throws Exception {
                     processElementForWindowObservingTruncateRestriction(input);
@@ -526,9 +536,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
                     return null;
                   }
 
-                  // TODO(BEAM-10303): Progress should work with window observing optimization.
                   @Override
                   public double getProgress() {
+                    Progress progress =
+                        FnApiDoFnRunner.this.getProgressFromWindowObservingTruncate(
+                            splitDelegate.getProgress());
+                    if (progress != null) {
+                      double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining();
+                      if (totalWork > 0) {
+                        return progress.getWorkCompleted() / totalWork;
+                      }
+                    }
                     return 0;
                   }
                 };
@@ -541,11 +559,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
         } else {
           // Only forward split/progress when the only consumer is splittable.
           if (mainOutputConsumers.size() == 1
-              && Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {
+              && Iterables.getOnlyElement(mainOutputConsumers) instanceof HandlesSplits) {
             mainInputConsumer =
                 new SplittableFnDataReceiver() {
                   private final HandlesSplits splitDelegate =
-                      (HandlesSplits) Iterables.get(mainOutputConsumers, 0);
+                      (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
 
                   @Override
                   public void accept(WindowedValue input) throws Exception {
@@ -940,6 +958,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
           currentElement.getWindows() instanceof List
               ? ((List) currentElement.getWindows()).listIterator()
               : ImmutableList.<BoundedWindow>copyOf(elem.getWindows()).listIterator();
+      windowStopIndex = currentElement.getWindows().size();
       while (true) {
         synchronized (splitLock) {
           if (!currentWindowIterator.hasNext()) {
@@ -1029,12 +1048,37 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
   private Progress getProgress() {
     synchronized (splitLock) {
       if (currentTracker instanceof RestrictionTracker.HasProgress) {
-        return ((HasProgress) currentTracker).getProgress();
+        return scaleProgress(
+            ((HasProgress) currentTracker).getProgress(),
+            currentWindowIterator.previousIndex(),
+            windowStopIndex);
+      }
+    }
+    return null;
+  }
+
+  private Progress getProgressFromWindowObservingTruncate(double elementCompleted) {
+    synchronized (splitLock) {
+      if (currentWindow != null) {
+        return scaleProgress(
+            Progress.from(elementCompleted, 1 - elementCompleted),
+            currentWindowIterator.previousIndex(),
+            windowStopIndex);
       }
     }
     return null;
   }
 
+  private static Progress scaleProgress(
+      Progress progress, int currentWindowIndex, int stopWindowIndex) {
+    double totalWorkPerWindow = progress.getWorkCompleted() + progress.getWorkRemaining();
+    double completed = totalWorkPerWindow * currentWindowIndex + progress.getWorkCompleted();
+    double remaining =
+        totalWorkPerWindow * (stopWindowIndex - currentWindowIndex - 1)
+            + progress.getWorkRemaining();
+    return Progress.from(completed, remaining);
+  }
+
   private HandlesSplits.SplitResult trySplitForElementAndRestriction(
       double fractionOfRemainder, Duration resumeDelay) {
     KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
@@ -1059,6 +1103,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
       List<BoundedWindow> primaryFullyProcessedWindows =
           ImmutableList.copyOf(
               Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+      windowStopIndex = currentWindowIterator.nextIndex();
       // Advances the iterator consuming the remaining windows.
       List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
       // If the window has been observed then the splitAndSize method would have already
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 6b7669e..63e784a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -2043,8 +2043,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
               () -> {
                 try {
                   doFn.waitForSplitElementToBeProcessed();
-                  // Currently processing "3" out of range [0, 5) elements.
-                  assertEquals(0.6, ((HandlesSplits) mainInput).getProgress(), 0.01);
+                  // Currently processing "3" out of range [0, 5) elements for the first window.
+                  assertEquals(0.3, ((HandlesSplits) mainInput).getProgress(), 0.01);
 
                   // Check that during progressing of an element we report progress
                   List<MonitoringInfo> mis =
@@ -2066,7 +2066,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
                   expectedRemaining.setPayload(
                       ByteString.copyFrom(
                           CoderUtils.encodeToByteArray(
-                              IterableCoder.of(DoubleCoder.of()), Collections.singletonList(2.0))));
+                              IterableCoder.of(DoubleCoder.of()), Collections.singletonList(7.0))));
                   assertThat(
                       mis,
                       containsInAnyOrder(expectedCompleted.build(), expectedRemaining.build()));