You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2023/04/20 14:58:03 UTC
[beam] branch release-2.47.0 updated: Implement naive progress tracking at BoundedSourceAsSDFRestrictionTracker (#26361)
This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch release-2.47.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.47.0 by this push:
new 4d1f1f19086 Implement naive progress tracking at BoundedSourceAsSDFRestrictionTracker (#26361)
4d1f1f19086 is described below
commit 4d1f1f19086d80bb809f76ab8cd7d409bafe122e
Author: Bruno Volpato <bv...@google.com>
AuthorDate: Thu Apr 20 10:57:52 2023 -0400
Implement naive progress tracking at BoundedSourceAsSDFRestrictionTracker (#26361)
---
.../src/main/java/org/apache/beam/sdk/io/Read.java | 22 ++++++++++++++++++----
.../splittabledofn/RestrictionTracker.java | 3 +++
2 files changed, 21 insertions(+), 4 deletions(-)
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 2f60d6037bf..ad71f2ef8e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -331,7 +332,7 @@ public class Read {
*/
private static class BoundedSourceAsSDFRestrictionTracker<
BoundedSourceT extends BoundedSource<T>, T>
- extends RestrictionTracker<BoundedSourceT, TimestampedValue<T>[]> {
+ extends RestrictionTracker<BoundedSourceT, TimestampedValue<T>[]> implements HasProgress {
private final BoundedSourceT initialRestriction;
private final PipelineOptions pipelineOptions;
private BoundedSource.BoundedReader<T> currentReader;
@@ -442,6 +443,19 @@ public class Read {
public IsBounded isBounded() {
return IsBounded.BOUNDED;
}
+
+ @Override
+ public Progress getProgress() {
+ // Unknown is treated as 0 progress
+ if (currentReader == null) {
+ return Progress.NONE;
+ }
+ Double consumedFraction = currentReader.getFractionConsumed();
+ if (consumedFraction == null) {
+ return Progress.NONE;
+ }
+ return Progress.from(consumedFraction, 1 - consumedFraction);
+ }
}
}
@@ -968,7 +982,7 @@ public class Read {
public Progress getProgress() {
// We treat the empty source as implicitly done.
if (currentRestriction().getSource() instanceof EmptyUnboundedSource) {
- return RestrictionTracker.Progress.from(1, 0);
+ return Progress.from(1, 0);
}
boolean resetReaderAfter = false;
@@ -986,7 +1000,7 @@ public class Read {
if (size != UnboundedReader.BACKLOG_UNKNOWN) {
// The UnboundedSource/UnboundedReader API has no way of reporting how much work
// has been completed so runners can only see the work remaining changing.
- return RestrictionTracker.Progress.from(0, size);
+ return Progress.from(0, size);
}
// TODO: Support "global" backlog reporting
@@ -996,7 +1010,7 @@ public class Read {
// }
// We treat unknown as 0 progress
- return RestrictionTracker.Progress.from(0, 1);
+ return Progress.NONE;
} finally {
if (resetReaderAfter) {
cacheCurrentReader(initialRestriction);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 88167d4f2f0..8ab8d3ac8b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -176,6 +176,9 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
@AutoValue
public abstract static class Progress {
+ /** Constant Progress instance to be used when no work has been completed yet. */
+ public static final Progress NONE = from(0, 1);
+
/**
* A representation for the amount of known completed and remaining work. See {@link
* HasProgress#getProgress()} for details.