You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/09/01 15:59:30 UTC

[beam] branch master updated: Wrap RestrictionTracker in SplittableParDoNaiveBounded

This is an automated email from the ASF dual-hosted git repository.

lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 163579b  Wrap RestrictionTracker in SplittableParDoNaiveBounded
     new ba2b25a  Merge pull request #12723 from boyuanzz/tracker
163579b is described below

commit 163579bbc962c3f3c6f1996fd4294f016f1c13ff
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Fri Aug 28 16:02:24 2020 -0700

    Wrap RestrictionTracker in SplittableParDoNaiveBounded
---
 runners/core-construction-java/build.gradle        |  1 +
 .../construction/SplittableParDoNaiveBounded.java  | 77 ++++++++++++----------
 2 files changed, 45 insertions(+), 33 deletions(-)

diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle
index ad3ca53..34b91e4 100644
--- a/runners/core-construction-java/build.gradle
+++ b/runners/core-construction-java/build.gradle
@@ -39,6 +39,7 @@ dependencies {
   compile project(path: ":model:pipeline", configuration: "shadow")
   compile project(path: ":model:job-management", configuration: "shadow")
   compile project(path: ":sdks:java:core", configuration: "shadow")
+  compile project(path: ":sdks:java:fn-execution")
   compile library.java.vendored_grpc_1_26_0
   compile library.java.vendored_guava_26_0_jre
   compile library.java.classgraph
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 052a21d..46e5f8b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -22,6 +22,8 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -212,43 +214,52 @@ public class SplittableParDoNaiveBounded {
         WatermarkEstimatorStateT currentWatermarkEstimatorState = watermarkEstimatorState;
 
         RestrictionTracker<RestrictionT, PositionT> tracker =
-            invoker.invokeNewTracker(
-                new BaseArgumentProvider<InputT, OutputT>() {
-                  @Override
-                  public InputT element(DoFn<InputT, OutputT> doFn) {
-                    return c.element().getKey();
-                  }
-
-                  @Override
-                  public RestrictionT restriction() {
-                    return currentRestriction;
-                  }
-
-                  @Override
-                  public Instant timestamp(DoFn<InputT, OutputT> doFn) {
-                    return c.timestamp();
-                  }
-
+            RestrictionTrackers.observe(
+                invoker.invokeNewTracker(
+                    new BaseArgumentProvider<InputT, OutputT>() {
+                      @Override
+                      public InputT element(DoFn<InputT, OutputT> doFn) {
+                        return c.element().getKey();
+                      }
+
+                      @Override
+                      public RestrictionT restriction() {
+                        return currentRestriction;
+                      }
+
+                      @Override
+                      public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+                        return c.timestamp();
+                      }
+
+                      @Override
+                      public PipelineOptions pipelineOptions() {
+                        return c.getPipelineOptions();
+                      }
+
+                      @Override
+                      public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+                        return c.pane();
+                      }
+
+                      @Override
+                      public BoundedWindow window() {
+                        return w;
+                      }
+
+                      @Override
+                      public String getErrorContext() {
+                        return NaiveProcessFn.class.getSimpleName() + ".invokeNewTracker";
+                      }
+                    }),
+                new ClaimObserver<PositionT>() {
                   @Override
-                  public PipelineOptions pipelineOptions() {
-                    return c.getPipelineOptions();
-                  }
-
-                  @Override
-                  public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
-                    return c.pane();
-                  }
+                  public void onClaimed(PositionT position) {}
 
                   @Override
-                  public BoundedWindow window() {
-                    return w;
-                  }
-
-                  @Override
-                  public String getErrorContext() {
-                    return NaiveProcessFn.class.getSimpleName() + ".invokeNewTracker";
-                  }
+                  public void onClaimFailed(PositionT position) {}
                 });
+
         WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator =
             invoker.invokeNewWatermarkEstimator(
                 new BaseArgumentProvider<InputT, OutputT>() {