You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/04/19 19:14:38 UTC

[04/50] [abbrv] beam git commit: ProcessFn remembers more info about its application context

ProcessFn remembers more info about its application context


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3fd88901
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3fd88901
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3fd88901

Branch: refs/heads/DSL_SQL
Commit: 3fd889015afa8528801d2c35c8c9f72b944ea472
Parents: a51bdd2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 15 16:39:51 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700

----------------------------------------------------------------------
 .../beam/runners/core/SplittableParDo.java      | 35 +++++++++++++++-----
 .../beam/runners/core/SplittableParDoTest.java  |  8 ++++-
 2 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3fd88901/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 9cc965a..44db1f7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -115,7 +115,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
             fn,
             input.getCoder(),
             restrictionCoder,
-            input.getWindowingStrategy(),
+            (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
             parDo.getSideInputs(),
             parDo.getMainOutputTag(),
             parDo.getAdditionalOutputTags()));
@@ -185,7 +185,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     private final DoFn<InputT, OutputT> fn;
     private final Coder<InputT> elementCoder;
     private final Coder<RestrictionT> restrictionCoder;
-    private final WindowingStrategy<?, ?> windowingStrategy;
+    private final WindowingStrategy<InputT, ?> windowingStrategy;
     private final List<PCollectionView<?>> sideInputs;
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList additionalOutputTags;
@@ -202,7 +202,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         DoFn<InputT, OutputT> fn,
         Coder<InputT> elementCoder,
         Coder<RestrictionT> restrictionCoder,
-        WindowingStrategy<?, ?> windowingStrategy,
+        WindowingStrategy<InputT, ?> windowingStrategy,
         List<PCollectionView<?>> sideInputs,
         TupleTag<OutputT> mainOutputTag,
         TupleTagList additionalOutputTags) {
@@ -234,7 +234,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
         DoFn<InputT, OutputT> fn) {
       return new SplittableParDo.ProcessFn<>(
-          fn, elementCoder, restrictionCoder, windowingStrategy.getWindowFn().windowCoder());
+          fn, elementCoder, restrictionCoder, windowingStrategy);
     }
 
     @Override
@@ -351,7 +351,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
     private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
 
     private final DoFn<InputT, OutputT> fn;
-    private final Coder<? extends BoundedWindow> windowCoder;
+    private final Coder<InputT> elementCoder;
+    private final Coder<RestrictionT> restrictionCoder;
+    private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
 
     private transient StateInternalsFactory<String> stateInternalsFactory;
     private transient TimerInternalsFactory<String> timerInternalsFactory;
@@ -364,11 +366,16 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         DoFn<InputT, OutputT> fn,
         Coder<InputT> elementCoder,
         Coder<RestrictionT> restrictionCoder,
-        Coder<? extends BoundedWindow> windowCoder) {
+        WindowingStrategy<InputT, ?> inputWindowingStrategy) {
       this.fn = fn;
-      this.windowCoder = windowCoder;
+      this.elementCoder = elementCoder;
+      this.restrictionCoder = restrictionCoder;
+      this.inputWindowingStrategy = inputWindowingStrategy;
       this.elementTag =
-          StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder));
+          StateTags.value(
+              "element",
+              WindowedValue.getFullCoder(
+                  elementCoder, inputWindowingStrategy.getWindowFn().windowCoder()));
       this.restrictionTag = StateTags.value("restriction", restrictionCoder);
     }
 
@@ -389,6 +396,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       return fn;
     }
 
+    public Coder<InputT> getElementCoder() {
+      return elementCoder;
+    }
+
+    public Coder<RestrictionT> getRestrictionCoder() {
+      return restrictionCoder;
+    }
+
+    public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
+      return inputWindowingStrategy;
+    }
+
     @Setup
     public void setup() throws Exception {
       invoker = DoFnInvokers.invokerFor(fn);

http://git-wip-us.apache.org/repos/asf/beam/blob/3fd88901/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 2c89543..5629635 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -51,11 +51,13 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
 import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
 import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -220,9 +222,13 @@ public class SplittableParDoTest {
         int maxOutputsPerBundle,
         Duration maxBundleDuration)
         throws Exception {
+      // The exact windowing strategy doesn't matter in this test, but it should be able to
+      // encode IntervalWindow's because that's what all tests here use.
+      WindowingStrategy<InputT, BoundedWindow> windowingStrategy =
+          (WindowingStrategy) WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1)));
       final SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
           new SplittableParDo.ProcessFn<>(
-              fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
+              fn, inputCoder, restrictionCoder, windowingStrategy);
       this.tester = DoFnTester.of(processFn);
       this.timerInternals = new InMemoryTimerInternals();
       this.stateInternals = new TestInMemoryStateInternals<>("dummy");