You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jk...@apache.org on 2017/04/19 01:12:26 UTC
[2/7] beam git commit: ProcessFn remembers more info about its
application context
ProcessFn remembers more info about its application context
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3fd88901
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3fd88901
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3fd88901
Branch: refs/heads/master
Commit: 3fd889015afa8528801d2c35c8c9f72b944ea472
Parents: a51bdd2
Author: Eugene Kirpichov <ki...@google.com>
Authored: Sat Apr 15 16:39:51 2017 -0700
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Tue Apr 18 18:02:06 2017 -0700
----------------------------------------------------------------------
.../beam/runners/core/SplittableParDo.java | 35 +++++++++++++++-----
.../beam/runners/core/SplittableParDoTest.java | 8 ++++-
2 files changed, 34 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/3fd88901/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 9cc965a..44db1f7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -115,7 +115,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
fn,
input.getCoder(),
restrictionCoder,
- input.getWindowingStrategy(),
+ (WindowingStrategy<InputT, ?>) input.getWindowingStrategy(),
parDo.getSideInputs(),
parDo.getMainOutputTag(),
parDo.getAdditionalOutputTags()));
@@ -185,7 +185,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private final DoFn<InputT, OutputT> fn;
private final Coder<InputT> elementCoder;
private final Coder<RestrictionT> restrictionCoder;
- private final WindowingStrategy<?, ?> windowingStrategy;
+ private final WindowingStrategy<InputT, ?> windowingStrategy;
private final List<PCollectionView<?>> sideInputs;
private final TupleTag<OutputT> mainOutputTag;
private final TupleTagList additionalOutputTags;
@@ -202,7 +202,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
DoFn<InputT, OutputT> fn,
Coder<InputT> elementCoder,
Coder<RestrictionT> restrictionCoder,
- WindowingStrategy<?, ?> windowingStrategy,
+ WindowingStrategy<InputT, ?> windowingStrategy,
List<PCollectionView<?>> sideInputs,
TupleTag<OutputT> mainOutputTag,
TupleTagList additionalOutputTags) {
@@ -234,7 +234,7 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
public ProcessFn<InputT, OutputT, RestrictionT, TrackerT> newProcessFn(
DoFn<InputT, OutputT> fn) {
return new SplittableParDo.ProcessFn<>(
- fn, elementCoder, restrictionCoder, windowingStrategy.getWindowFn().windowCoder());
+ fn, elementCoder, restrictionCoder, windowingStrategy);
}
@Override
@@ -351,7 +351,9 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
private StateTag<Object, ValueState<RestrictionT>> restrictionTag;
private final DoFn<InputT, OutputT> fn;
- private final Coder<? extends BoundedWindow> windowCoder;
+ private final Coder<InputT> elementCoder;
+ private final Coder<RestrictionT> restrictionCoder;
+ private final WindowingStrategy<InputT, ?> inputWindowingStrategy;
private transient StateInternalsFactory<String> stateInternalsFactory;
private transient TimerInternalsFactory<String> timerInternalsFactory;
@@ -364,11 +366,16 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
DoFn<InputT, OutputT> fn,
Coder<InputT> elementCoder,
Coder<RestrictionT> restrictionCoder,
- Coder<? extends BoundedWindow> windowCoder) {
+ WindowingStrategy<InputT, ?> inputWindowingStrategy) {
this.fn = fn;
- this.windowCoder = windowCoder;
+ this.elementCoder = elementCoder;
+ this.restrictionCoder = restrictionCoder;
+ this.inputWindowingStrategy = inputWindowingStrategy;
this.elementTag =
- StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder));
+ StateTags.value(
+ "element",
+ WindowedValue.getFullCoder(
+ elementCoder, inputWindowingStrategy.getWindowFn().windowCoder()));
this.restrictionTag = StateTags.value("restriction", restrictionCoder);
}
@@ -389,6 +396,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
return fn;
}
+ public Coder<InputT> getElementCoder() {
+ return elementCoder;
+ }
+
+ public Coder<RestrictionT> getRestrictionCoder() {
+ return restrictionCoder;
+ }
+
+ public WindowingStrategy<InputT, ?> getInputWindowingStrategy() {
+ return inputWindowingStrategy;
+ }
+
@Setup
public void setup() throws Exception {
invoker = DoFnInvokers.invokerFor(fn);
http://git-wip-us.apache.org/repos/asf/beam/blob/3fd88901/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index 2c89543..5629635 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -51,11 +51,13 @@ import org.apache.beam.sdk.transforms.splittabledofn.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.OffsetRangeTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.SideInputReader;
import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -220,9 +222,13 @@ public class SplittableParDoTest {
int maxOutputsPerBundle,
Duration maxBundleDuration)
throws Exception {
+ // The exact windowing strategy doesn't matter in this test, but it should be able to
+ // encode IntervalWindow's because that's what all tests here use.
+ WindowingStrategy<InputT, BoundedWindow> windowingStrategy =
+ (WindowingStrategy) WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(1)));
final SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn =
new SplittableParDo.ProcessFn<>(
- fn, inputCoder, restrictionCoder, IntervalWindow.getCoder());
+ fn, inputCoder, restrictionCoder, windowingStrategy);
this.tester = DoFnTester.of(processFn);
this.timerInternals = new InMemoryTimerInternals();
this.stateInternals = new TestInMemoryStateInternals<>("dummy");