You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by lc...@apache.org on 2020/09/01 15:59:30 UTC
[beam] branch master updated: Wrap RestrictionTracker in
SplittableParDoNaiveBounded
This is an automated email from the ASF dual-hosted git repository.
lcwik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 163579b Wrap RestrictionTracker in SplittableParDoNaiveBounded
new ba2b25a Merge pull request #12723 from boyuanzz/tracker
163579b is described below
commit 163579bbc962c3f3c6f1996fd4294f016f1c13ff
Author: Boyuan Zhang <bo...@google.com>
AuthorDate: Fri Aug 28 16:02:24 2020 -0700
Wrap RestrictionTracker in SplittableParDoNaiveBounded
---
runners/core-construction-java/build.gradle | 1 +
.../construction/SplittableParDoNaiveBounded.java | 77 ++++++++++++----------
2 files changed, 45 insertions(+), 33 deletions(-)
diff --git a/runners/core-construction-java/build.gradle b/runners/core-construction-java/build.gradle
index ad3ca53..34b91e4 100644
--- a/runners/core-construction-java/build.gradle
+++ b/runners/core-construction-java/build.gradle
@@ -39,6 +39,7 @@ dependencies {
compile project(path: ":model:pipeline", configuration: "shadow")
compile project(path: ":model:job-management", configuration: "shadow")
compile project(path: ":sdks:java:core", configuration: "shadow")
+ compile project(path: ":sdks:java:fn-execution")
compile library.java.vendored_grpc_1_26_0
compile library.java.vendored_guava_26_0_jre
compile library.java.classgraph
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
index 052a21d..46e5f8b 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/SplittableParDoNaiveBounded.java
@@ -22,6 +22,8 @@ import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Prec
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers;
+import org.apache.beam.sdk.fn.splittabledofn.RestrictionTrackers.ClaimObserver;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.runners.PTransformOverrideFactory;
@@ -212,43 +214,52 @@ public class SplittableParDoNaiveBounded {
WatermarkEstimatorStateT currentWatermarkEstimatorState = watermarkEstimatorState;
RestrictionTracker<RestrictionT, PositionT> tracker =
- invoker.invokeNewTracker(
- new BaseArgumentProvider<InputT, OutputT>() {
- @Override
- public InputT element(DoFn<InputT, OutputT> doFn) {
- return c.element().getKey();
- }
-
- @Override
- public RestrictionT restriction() {
- return currentRestriction;
- }
-
- @Override
- public Instant timestamp(DoFn<InputT, OutputT> doFn) {
- return c.timestamp();
- }
-
+ RestrictionTrackers.observe(
+ invoker.invokeNewTracker(
+ new BaseArgumentProvider<InputT, OutputT>() {
+ @Override
+ public InputT element(DoFn<InputT, OutputT> doFn) {
+ return c.element().getKey();
+ }
+
+ @Override
+ public RestrictionT restriction() {
+ return currentRestriction;
+ }
+
+ @Override
+ public Instant timestamp(DoFn<InputT, OutputT> doFn) {
+ return c.timestamp();
+ }
+
+ @Override
+ public PipelineOptions pipelineOptions() {
+ return c.getPipelineOptions();
+ }
+
+ @Override
+ public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
+ return c.pane();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ return w;
+ }
+
+ @Override
+ public String getErrorContext() {
+ return NaiveProcessFn.class.getSimpleName() + ".invokeNewTracker";
+ }
+ }),
+ new ClaimObserver<PositionT>() {
@Override
- public PipelineOptions pipelineOptions() {
- return c.getPipelineOptions();
- }
-
- @Override
- public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
- return c.pane();
- }
+ public void onClaimed(PositionT position) {}
@Override
- public BoundedWindow window() {
- return w;
- }
-
- @Override
- public String getErrorContext() {
- return NaiveProcessFn.class.getSimpleName() + ".invokeNewTracker";
- }
+ public void onClaimFailed(PositionT position) {}
});
+
WatermarkEstimator<WatermarkEstimatorStateT> watermarkEstimator =
invoker.invokeNewWatermarkEstimator(
new BaseArgumentProvider<InputT, OutputT>() {