You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jr...@apache.org on 2023/04/20 14:58:03 UTC

[beam] branch release-2.47.0 updated: Implement naive progress tracking at BoundedSourceAsSDFRestrictionTracker (#26361)

This is an automated email from the ASF dual-hosted git repository.

jrmccluskey pushed a commit to branch release-2.47.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.47.0 by this push:
     new 4d1f1f19086 Implement naive progress tracking at BoundedSourceAsSDFRestrictionTracker (#26361)
4d1f1f19086 is described below

commit 4d1f1f19086d80bb809f76ab8cd7d409bafe122e
Author: Bruno Volpato <bv...@google.com>
AuthorDate: Thu Apr 20 10:57:52 2023 -0400

    Implement naive progress tracking at BoundedSourceAsSDFRestrictionTracker (#26361)
---
 .../src/main/java/org/apache/beam/sdk/io/Read.java | 22 ++++++++++++++++++----
 .../splittabledofn/RestrictionTracker.java         |  3 +++
 2 files changed, 21 insertions(+), 4 deletions(-)

diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
index 2f60d6037bf..ad71f2ef8e6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java
@@ -50,6 +50,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
 import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
 import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -331,7 +332,7 @@ public class Read {
      */
     private static class BoundedSourceAsSDFRestrictionTracker<
             BoundedSourceT extends BoundedSource<T>, T>
-        extends RestrictionTracker<BoundedSourceT, TimestampedValue<T>[]> {
+        extends RestrictionTracker<BoundedSourceT, TimestampedValue<T>[]> implements HasProgress {
       private final BoundedSourceT initialRestriction;
       private final PipelineOptions pipelineOptions;
       private BoundedSource.BoundedReader<T> currentReader;
@@ -442,6 +443,19 @@ public class Read {
       public IsBounded isBounded() {
         return IsBounded.BOUNDED;
       }
+
+      @Override
+      public Progress getProgress() {
+        // Unknown is treated as 0 progress
+        if (currentReader == null) {
+          return Progress.NONE;
+        }
+        Double consumedFraction = currentReader.getFractionConsumed();
+        if (consumedFraction == null) {
+          return Progress.NONE;
+        }
+        return Progress.from(consumedFraction, 1 - consumedFraction);
+      }
     }
   }
 
@@ -968,7 +982,7 @@ public class Read {
       public Progress getProgress() {
         // We treat the empty source as implicitly done.
         if (currentRestriction().getSource() instanceof EmptyUnboundedSource) {
-          return RestrictionTracker.Progress.from(1, 0);
+          return Progress.from(1, 0);
         }
 
         boolean resetReaderAfter = false;
@@ -986,7 +1000,7 @@ public class Read {
           if (size != UnboundedReader.BACKLOG_UNKNOWN) {
             // The UnboundedSource/UnboundedReader API has no way of reporting how much work
             // has been completed so runners can only see the work remaining changing.
-            return RestrictionTracker.Progress.from(0, size);
+            return Progress.from(0, size);
           }
 
           // TODO: Support "global" backlog reporting
@@ -996,7 +1010,7 @@ public class Read {
           // }
 
           // We treat unknown as 0 progress
-          return RestrictionTracker.Progress.from(0, 1);
+          return Progress.NONE;
         } finally {
           if (resetReaderAfter) {
             cacheCurrentReader(initialRestriction);
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 88167d4f2f0..8ab8d3ac8b4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -176,6 +176,9 @@ public abstract class RestrictionTracker<RestrictionT, PositionT> {
   @AutoValue
   public abstract static class Progress {
 
+    /** Constant Progress instance to be used when no work has been completed yet. */
+    public static final Progress NONE = from(0, 1);
+
     /**
      * A representation for the amount of known completed and remaining work. See {@link
      * HasProgress#getProgress()} for details.