You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by bo...@apache.org on 2020/08/15 04:36:04 UTC
[beam] branch master updated: Scale progress with respect to
windows observation.
This is an automated email from the ASF dual-hosted git repository.
boyuanz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new a76edaa Scale progress with respect to windows observation.
new 314caa8 Merge pull request #12430 from boyuanzz/scale_progress
a76edaa is described below
commit a76edaa9582ac433a5f28c3b409d139134f5a8e1
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Thu Jul 30 16:27:00 2020 -0700
Scale progress with respect to windows observation.
---
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 55 ++++++++++++++++++++--
.../beam/fn/harness/FnApiDoFnRunnerTest.java | 6 +--
2 files changed, 53 insertions(+), 8 deletions(-)
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 702bcf4..b5ffe5a 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -251,6 +251,13 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private ListIterator<BoundedWindow> currentWindowIterator;
/**
+ * Only valud during {@link
+ * #processElementForWindowObservingSizedElementAndRestriction(WindowedValue)} and {@link
+ * #processElementForWindowObservingTruncateRestriction(WindowedValue)}.
+ */
+ private int windowStopIndex;
+
+ /**
* Only valid during {@link #processElementForPairWithRestriction}, {@link
* #processElementForSplitRestriction}, and {@link #processElementForSizedElementAndRestriction},
* null otherwise.
@@ -512,9 +519,12 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
|| !sideInputMapping.isEmpty()) {
// Only forward split/progress when the only consumer is splittable.
if (mainOutputConsumers.size() == 1
- && Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {
+ && Iterables.getOnlyElement(mainOutputConsumers) instanceof HandlesSplits) {
mainInputConsumer =
new SplittableFnDataReceiver() {
+ private final HandlesSplits splitDelegate =
+ (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
+
@Override
public void accept(WindowedValue input) throws Exception {
processElementForWindowObservingTruncateRestriction(input);
@@ -526,9 +536,17 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
return null;
}
- // TODO(BEAM-10303): Progress should work with window observing optimization.
@Override
public double getProgress() {
+ Progress progress =
+ FnApiDoFnRunner.this.getProgressFromWindowObservingTruncate(
+ splitDelegate.getProgress());
+ if (progress != null) {
+ double totalWork = progress.getWorkCompleted() + progress.getWorkRemaining();
+ if (totalWork > 0) {
+ return progress.getWorkCompleted() / totalWork;
+ }
+ }
return 0;
}
};
@@ -541,11 +559,11 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
} else {
// Only forward split/progress when the only consumer is splittable.
if (mainOutputConsumers.size() == 1
- && Iterables.get(mainOutputConsumers, 0) instanceof HandlesSplits) {
+ && Iterables.getOnlyElement(mainOutputConsumers) instanceof HandlesSplits) {
mainInputConsumer =
new SplittableFnDataReceiver() {
private final HandlesSplits splitDelegate =
- (HandlesSplits) Iterables.get(mainOutputConsumers, 0);
+ (HandlesSplits) Iterables.getOnlyElement(mainOutputConsumers);
@Override
public void accept(WindowedValue input) throws Exception {
@@ -940,6 +958,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
currentElement.getWindows() instanceof List
? ((List) currentElement.getWindows()).listIterator()
: ImmutableList.<BoundedWindow>copyOf(elem.getWindows()).listIterator();
+ windowStopIndex = currentElement.getWindows().size();
while (true) {
synchronized (splitLock) {
if (!currentWindowIterator.hasNext()) {
@@ -1029,12 +1048,37 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
private Progress getProgress() {
synchronized (splitLock) {
if (currentTracker instanceof RestrictionTracker.HasProgress) {
- return ((HasProgress) currentTracker).getProgress();
+ return scaleProgress(
+ ((HasProgress) currentTracker).getProgress(),
+ currentWindowIterator.previousIndex(),
+ windowStopIndex);
+ }
+ }
+ return null;
+ }
+
+ private Progress getProgressFromWindowObservingTruncate(double elementCompleted) {
+ synchronized (splitLock) {
+ if (currentWindow != null) {
+ return scaleProgress(
+ Progress.from(elementCompleted, 1 - elementCompleted),
+ currentWindowIterator.previousIndex(),
+ windowStopIndex);
}
}
return null;
}
+ private static Progress scaleProgress(
+ Progress progress, int currentWindowIndex, int stopWindowIndex) {
+ double totalWorkPerWindow = progress.getWorkCompleted() + progress.getWorkRemaining();
+ double completed = totalWorkPerWindow * currentWindowIndex + progress.getWorkCompleted();
+ double remaining =
+ totalWorkPerWindow * (stopWindowIndex - currentWindowIndex - 1)
+ + progress.getWorkRemaining();
+ return Progress.from(completed, remaining);
+ }
+
private HandlesSplits.SplitResult trySplitForElementAndRestriction(
double fractionOfRemainder, Duration resumeDelay) {
KV<Instant, WatermarkEstimatorStateT> watermarkAndState;
@@ -1059,6 +1103,7 @@ public class FnApiDoFnRunner<InputT, RestrictionT, PositionT, WatermarkEstimator
List<BoundedWindow> primaryFullyProcessedWindows =
ImmutableList.copyOf(
Iterables.limit(currentElement.getWindows(), currentWindowIterator.previousIndex()));
+ windowStopIndex = currentWindowIterator.nextIndex();
// Advances the iterator consuming the remaining windows.
List<BoundedWindow> residualUnprocessedWindows = ImmutableList.copyOf(currentWindowIterator);
// If the window has been observed then the splitAndSize method would have already
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
index 6b7669e..63e784a 100644
--- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/FnApiDoFnRunnerTest.java
@@ -2043,8 +2043,8 @@ public class FnApiDoFnRunnerTest implements Serializable {
() -> {
try {
doFn.waitForSplitElementToBeProcessed();
- // Currently processing "3" out of range [0, 5) elements.
- assertEquals(0.6, ((HandlesSplits) mainInput).getProgress(), 0.01);
+ // Currently processing "3" out of range [0, 5) elements for the first window.
+ assertEquals(0.3, ((HandlesSplits) mainInput).getProgress(), 0.01);
// Check that during progressing of an element we report progress
List<MonitoringInfo> mis =
@@ -2066,7 +2066,7 @@ public class FnApiDoFnRunnerTest implements Serializable {
expectedRemaining.setPayload(
ByteString.copyFrom(
CoderUtils.encodeToByteArray(
- IterableCoder.of(DoubleCoder.of()), Collections.singletonList(2.0))));
+ IterableCoder.of(DoubleCoder.of()), Collections.singletonList(7.0))));
assertThat(
mis,
containsInAnyOrder(expectedCompleted.build(), expectedRemaining.build()));