You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/11/18 06:32:15 UTC
[2/7] incubator-beam git commit: Refactor FlinkProcessContext more
cleanly into single- and multi-output versions
Refactor FlinkProcessContext more cleanly into single- and multi-output versions
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fb1f7be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fb1f7be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fb1f7be
Branch: refs/heads/master
Commit: 1fb1f7bebeacf54e66f606f373d68c14483a444c
Parents: 24cae56
Author: Eugene Kirpichov <ki...@google.com>
Authored: Fri Nov 11 16:37:42 2016 -0800
Committer: Eugene Kirpichov <ki...@google.com>
Committed: Thu Nov 17 13:18:36 2016 -0800
----------------------------------------------------------------------
.../functions/FlinkDoFnFunction.java | 12 +-
.../FlinkMergingNonShuffleReduceFunction.java | 14 +-
.../FlinkMergingPartialReduceFunction.java | 14 +-
.../functions/FlinkMergingReduceFunction.java | 12 +-
.../functions/FlinkMultiOutputDoFnFunction.java | 14 +-
.../FlinkMultiOutputProcessContext.java | 94 +----
.../functions/FlinkPartialReduceFunction.java | 14 +-
.../functions/FlinkProcessContext.java | 343 -------------------
.../functions/FlinkProcessContextBase.java | 285 +++++++++++++++
.../functions/FlinkReduceFunction.java | 14 +-
.../FlinkSingleOutputProcessContext.java | 70 ++++
11 files changed, 421 insertions(+), 465 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 798a23c..dc0ef0f 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -64,20 +64,20 @@ public class FlinkDoFnFunction<InputT, OutputT>
Iterable<WindowedValue<InputT>> values,
Collector<WindowedValue<OutputT>> out) throws Exception {
- FlinkProcessContext<InputT, OutputT> context = new FlinkProcessContext<>(
+ FlinkSingleOutputProcessContext<InputT, OutputT> context = new FlinkSingleOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
- out,
- sideInputs);
+ sideInputs, out
+ );
this.doFn.startBundle(context);
if (!requiresWindowAccess || hasSideInputs) {
// we don't need to explode the windows
for (WindowedValue<InputT> value : values) {
- context = context.forWindowedValue(value);
+ context.setWindowedValue(value);
doFn.processElement(context);
}
} else {
@@ -86,7 +86,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
// is in only one window
for (WindowedValue<InputT> value : values) {
for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
- context = context.forWindowedValue(explodedValue);
+ context.setWindowedValue(explodedValue);
doFn.processElement(context);
}
}
@@ -94,7 +94,7 @@ public class FlinkDoFnFunction<InputT, OutputT>
// set the windowed value to null so that the special logic for outputting
// in startBundle/finishBundle kicks in
- context = context.forWindowedValue(null);
+ context.setWindowedValue(null);
this.doFn.finishBundle(context);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
index dbaab17..a4284f8 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java
@@ -92,14 +92,14 @@ public class FlinkMergingNonShuffleReduceFunction<
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
- FlinkProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext =
- new FlinkProcessContext<>(
+ FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, OutputT>> processContext =
+ new FlinkSingleOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
- out,
- sideInputs);
+ sideInputs, out
+ );
PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner =
PerKeyCombineFnRunners.create(combineFn);
@@ -141,7 +141,7 @@ public class FlinkMergingNonShuffleReduceFunction<
IntervalWindow currentWindow =
(IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
InputT firstValue = currentValue.getValue().getValue();
- processContext = processContext.forWindowedValue(currentValue);
+ processContext.setWindowedValue(currentValue);
AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
@@ -157,7 +157,7 @@ public class FlinkMergingNonShuffleReduceFunction<
// continue accumulating and merge windows
InputT value = nextValue.getValue().getValue();
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
windowTimestamp = outputTimeFn.combine(
@@ -175,7 +175,7 @@ public class FlinkMergingNonShuffleReduceFunction<
currentWindow = nextWindow;
InputT value = nextValue.getValue().getValue();
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.createAccumulator(key, processContext);
accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
index bc09bdf..30d3326 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java
@@ -60,14 +60,14 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
- FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
- new FlinkProcessContext<>(
+ FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
+ new FlinkSingleOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
- out,
- sideInputs);
+ sideInputs, out
+ );
PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
PerKeyCombineFnRunners.create(combineFn);
@@ -109,7 +109,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
IntervalWindow currentWindow =
(IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows());
InputT firstValue = currentValue.getValue().getValue();
- processContext = processContext.forWindowedValue(currentValue);
+ processContext.setWindowedValue(currentValue);
AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
@@ -125,7 +125,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
// continue accumulating and merge windows
InputT value = nextValue.getValue().getValue();
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
windowTimestamp = outputTimeFn.combine(
@@ -143,7 +143,7 @@ public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends Inte
currentWindow = nextWindow;
InputT value = nextValue.getValue().getValue();
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.createAccumulator(key, processContext);
accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
index 4050f47..29dc1e3 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java
@@ -62,14 +62,14 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
Iterable<WindowedValue<KV<K, AccumT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
- FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
- new FlinkProcessContext<>(
+ FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
+ new FlinkSingleOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
- out,
- sideInputs);
+ sideInputs, out
+ );
PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
PerKeyCombineFnRunners.create(combineFn);
@@ -127,7 +127,7 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
if (nextWindow.equals(currentWindow)) {
// continue accumulating and merge windows
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.mergeAccumulators(
key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext);
@@ -143,7 +143,7 @@ public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWi
windowTimestamps.clear();
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
currentWindow = nextWindow;
accumulator = nextValue.getValue().getValue();
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
index 810609e..7be4bb4 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -74,22 +74,22 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
Iterable<WindowedValue<InputT>> values,
Collector<WindowedValue<RawUnionValue>> out) throws Exception {
- FlinkProcessContext<InputT, OutputT> context =
+ FlinkMultiOutputProcessContext<InputT, OutputT> context =
new FlinkMultiOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
- out,
- outputMap,
- sideInputs);
+ sideInputs, out,
+ outputMap
+ );
this.doFn.startBundle(context);
if (!requiresWindowAccess || hasSideInputs) {
// we don't need to explode the windows
for (WindowedValue<InputT> value : values) {
- context = context.forWindowedValue(value);
+ context.setWindowedValue(value);
doFn.processElement(context);
}
} else {
@@ -98,7 +98,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
// is in only one window
for (WindowedValue<InputT> value : values) {
for (WindowedValue<InputT> explodedValue : value.explodeWindows()) {
- context = context.forWindowedValue(value);
+ context.setWindowedValue(value);
doFn.processElement(context);
}
}
@@ -106,7 +106,7 @@ public class FlinkMultiOutputDoFnFunction<InputT, OutputT>
// set the windowed value to null so that the special logic for outputting
// in startBundle/finishBundle kicks in
- context = context.forWindowedValue(null);
+ context.setWindowedValue(null);
this.doFn.finishBundle(context);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
index 153a2d7..a3d2b18 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java
@@ -33,95 +33,35 @@ import org.apache.flink.util.Collector;
import org.joda.time.Instant;
/**
- * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports
- * side outputs.
+ * {@link OldDoFn.ProcessContext} for {@link FlinkMultiOutputDoFnFunction} that supports side
+ * outputs.
*/
class FlinkMultiOutputProcessContext<InputT, OutputT>
- extends FlinkProcessContext<InputT, OutputT> {
+ extends FlinkProcessContextBase<InputT, OutputT> {
- // we need a different Collector from the base class
private final Collector<WindowedValue<RawUnionValue>> collector;
-
private final Map<TupleTag<?>, Integer> outputMap;
-
FlinkMultiOutputProcessContext(
PipelineOptions pipelineOptions,
RuntimeContext runtimeContext,
OldDoFn<InputT, OutputT> doFn,
WindowingStrategy<?, ?> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
Collector<WindowedValue<RawUnionValue>> collector,
- Map<TupleTag<?>, Integer> outputMap,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
- super(
- pipelineOptions,
- runtimeContext,
- doFn,
- windowingStrategy,
- new Collector<WindowedValue<OutputT>>() {
- @Override
- public void collect(WindowedValue<OutputT> outputTWindowedValue) {
-
- }
-
- @Override
- public void close() {
-
- }
- },
- sideInputs);
-
+ Map<TupleTag<?>, Integer> outputMap) {
+ super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs);
this.collector = collector;
this.outputMap = outputMap;
}
@Override
- public FlinkProcessContext<InputT, OutputT> forWindowedValue(
- WindowedValue<InputT> windowedValue) {
- this.windowedValue = windowedValue;
- return this;
- }
-
- @Override
- public void outputWithTimestamp(OutputT value, Instant timestamp) {
- if (windowedValue == null) {
- // we are in startBundle() or finishBundle()
-
- try {
- Collection windows = windowingStrategy.getWindowFn().assignWindows(
- new FlinkNoElementAssignContext(
- windowingStrategy.getWindowFn(),
- value,
- timestamp));
-
- collector.collect(
- WindowedValue.of(
- new RawUnionValue(0, value),
- timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
- windows,
- PaneInfo.NO_FIRING));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- } else {
- collector.collect(
- WindowedValue.of(
- new RawUnionValue(0, value),
- windowedValue.getTimestamp(),
- windowedValue.getWindows(),
- windowedValue.getPane()));
- }
- }
-
- @Override
protected void outputWithTimestampAndWindow(
OutputT value,
Instant timestamp,
Collection<? extends BoundedWindow> windows,
PaneInfo pane) {
- collector.collect(
- WindowedValue.of(
- new RawUnionValue(0, value), timestamp, windows, pane));
+ collector.collect(WindowedValue.of(new RawUnionValue(0, value), timestamp, windows, pane));
}
@Override
@@ -142,19 +82,24 @@ class FlinkMultiOutputProcessContext<InputT, OutputT>
throw new IllegalArgumentException("Unknown side output tag: " + tag);
}
+ outputUnionValue(value, timestamp, new RawUnionValue(index, value));
+ }
+
+ private <T> void outputUnionValue(T value, Instant timestamp, RawUnionValue unionValue) {
if (windowedValue == null) {
// we are in startBundle() or finishBundle()
try {
- Collection windows = windowingStrategy.getWindowFn().assignWindows(
- new FlinkNoElementAssignContext(
- windowingStrategy.getWindowFn(),
- value,
- timestamp));
+ Collection<? extends BoundedWindow> windows =
+ windowingStrategy
+ .getWindowFn()
+ .assignWindows(
+ new FlinkNoElementAssignContext(
+ windowingStrategy.getWindowFn(), value, timestamp));
collector.collect(
WindowedValue.of(
- new RawUnionValue(index, value),
+ unionValue,
timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
windows,
PaneInfo.NO_FIRING));
@@ -164,11 +109,10 @@ class FlinkMultiOutputProcessContext<InputT, OutputT>
} else {
collector.collect(
WindowedValue.of(
- new RawUnionValue(index, value),
+ unionValue,
windowedValue.getTimestamp(),
windowedValue.getWindows(),
windowedValue.getPane()));
}
-
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
index fa2ce4d..3ea456a 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java
@@ -88,14 +88,14 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
Iterable<WindowedValue<KV<K, InputT>>> elements,
Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception {
- FlinkProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
- new FlinkProcessContext<>(
+ FlinkSingleOutputProcessContext<KV<K, InputT>, KV<K, AccumT>> processContext =
+ new FlinkSingleOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
- out,
- sideInputs);
+ sideInputs, out
+ );
PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner =
PerKeyCombineFnRunners.create(combineFn);
@@ -132,7 +132,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
K key = currentValue.getValue().getKey();
BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null);
InputT firstValue = currentValue.getValue().getValue();
- processContext = processContext.forWindowedValue(currentValue);
+ processContext.setWindowedValue(currentValue);
AccumT accumulator = combineFnRunner.createAccumulator(key, processContext);
accumulator = combineFnRunner.addInput(key, accumulator, firstValue, processContext);
@@ -147,7 +147,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
if (nextWindow.equals(currentWindow)) {
// continue accumulating
InputT value = nextValue.getValue().getValue();
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
windowTimestamp = outputTimeFn.combine(
@@ -165,7 +165,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind
currentWindow = nextWindow;
InputT value = nextValue.getValue().getValue();
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.createAccumulator(key, processContext);
accumulator = combineFnRunner.addInput(key, accumulator, value, processContext);
windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
deleted file mode 100644
index 1b28a70..0000000
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContext.java
+++ /dev/null
@@ -1,343 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.flink.translation.functions;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.common.collect.Iterables;
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.Map;
-import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
-import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.util.TimerInternals;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingInternals;
-import org.apache.beam.sdk.util.WindowingStrategy;
-import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.values.PCollectionView;
-import org.apache.beam.sdk.values.TupleTag;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Collector;
-import org.joda.time.Instant;
-
-/**
- * {@link OldDoFn.ProcessContext} for our Flink Wrappers.
- */
-class FlinkProcessContext<InputT, OutputT>
- extends OldDoFn<InputT, OutputT>.ProcessContext {
-
- private final PipelineOptions pipelineOptions;
- private final RuntimeContext runtimeContext;
- private Collector<WindowedValue<OutputT>> collector;
- private final boolean requiresWindowAccess;
-
- protected WindowedValue<InputT> windowedValue;
-
- protected WindowingStrategy<?, ?> windowingStrategy;
-
- private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
-
- FlinkProcessContext(
- PipelineOptions pipelineOptions,
- RuntimeContext runtimeContext,
- OldDoFn<InputT, OutputT> doFn,
- WindowingStrategy<?, ?> windowingStrategy,
- Collector<WindowedValue<OutputT>> collector,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
- doFn.super();
- checkNotNull(pipelineOptions);
- checkNotNull(runtimeContext);
- checkNotNull(doFn);
- checkNotNull(collector);
-
- this.pipelineOptions = pipelineOptions;
- this.runtimeContext = runtimeContext;
- this.collector = collector;
- this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
- this.windowingStrategy = windowingStrategy;
- this.sideInputs = sideInputs;
-
- super.setupDelegateAggregators();
- }
-
- FlinkProcessContext(
- PipelineOptions pipelineOptions,
- RuntimeContext runtimeContext,
- OldDoFn<InputT, OutputT> doFn,
- WindowingStrategy<?, ?> windowingStrategy,
- Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs) {
- doFn.super();
- checkNotNull(pipelineOptions);
- checkNotNull(runtimeContext);
- checkNotNull(doFn);
-
- this.pipelineOptions = pipelineOptions;
- this.runtimeContext = runtimeContext;
- this.collector = null;
- this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
- this.windowingStrategy = windowingStrategy;
- this.sideInputs = sideInputs;
-
- super.setupDelegateAggregators();
- }
-
- public FlinkProcessContext<InputT, OutputT> forOutput(
- Collector<WindowedValue<OutputT>> collector) {
- this.collector = collector;
-
- // for now, returns ourselves, to be easy on the GC
- return this;
- }
-
-
-
- public FlinkProcessContext<InputT, OutputT> forWindowedValue(
- WindowedValue<InputT> windowedValue) {
- this.windowedValue = windowedValue;
-
- // for now, returns ourselves, to be easy on the GC
- return this;
- }
-
- @Override
- public InputT element() {
- return this.windowedValue.getValue();
- }
-
-
- @Override
- public Instant timestamp() {
- return windowedValue.getTimestamp();
- }
-
- @Override
- public BoundedWindow window() {
- if (!requiresWindowAccess) {
- throw new UnsupportedOperationException(
- "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
- }
- return Iterables.getOnlyElement(windowedValue.getWindows());
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public WindowingInternals<InputT, OutputT> windowingInternals() {
-
- return new WindowingInternals<InputT, OutputT>() {
-
- @Override
- public StateInternals stateInternals() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void outputWindowedValue(
- OutputT value,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- // TODO: Refactor this (get rid of duplication, move things around w.r.t.
- // FlinkMultiOutputProcessContext)
- collector.collect(WindowedValue.of(value, timestamp, windows, pane));
- outputWithTimestampAndWindow(value, timestamp, windows, pane);
- }
-
- @Override
- public <SideOutputT> void sideOutputWindowedValue(
- TupleTag<SideOutputT> tag,
- SideOutputT output,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- // TODO: Implement this
- throw new UnsupportedOperationException();
- }
-
- @Override
- public TimerInternals timerInternals() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Collection<? extends BoundedWindow> windows() {
- return windowedValue.getWindows();
- }
-
- @Override
- public PaneInfo pane() {
- return windowedValue.getPane();
- }
-
- @Override
- public <T> void writePCollectionViewData(TupleTag<?> tag,
- Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <ViewT> ViewT sideInput(
- PCollectionView<ViewT> view,
- BoundedWindow mainInputWindow) {
-
- checkNotNull(view, "View passed to sideInput cannot be null");
- checkNotNull(
- sideInputs.get(view),
- "Side input for " + view + " not available.");
-
- // get the side input strategy for mapping the window
- WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
-
- BoundedWindow sideInputWindow =
- windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow);
-
- Map<BoundedWindow, ViewT> sideInputs =
- runtimeContext.getBroadcastVariableWithInitializer(
- view.getTagInternal().getId(), new SideInputInitializer<>(view));
- return sideInputs.get(sideInputWindow);
- }
- };
- }
-
- @Override
- public PipelineOptions getPipelineOptions() {
- return pipelineOptions;
- }
-
- @Override
- public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) {
- checkNotNull(view, "View passed to sideInput cannot be null");
- checkNotNull(sideInputs.get(view), "Side input for " + view + " not available.");
- Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator();
- BoundedWindow window;
- if (!windowIter.hasNext()) {
- throw new IllegalStateException(
- "sideInput called when main input element is not in any windows");
- } else {
- window = windowIter.next();
- if (windowIter.hasNext()) {
- throw new IllegalStateException(
- "sideInput called when main input element is in multiple windows");
- }
- }
-
- // get the side input strategy for mapping the window
- WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
-
- BoundedWindow sideInputWindow =
- windowingStrategy.getWindowFn().getSideInputWindow(window);
-
- Map<BoundedWindow, ViewT> sideInputs =
- runtimeContext.getBroadcastVariableWithInitializer(
- view.getTagInternal().getId(), new SideInputInitializer<>(view));
- ViewT result = sideInputs.get(sideInputWindow);
- if (result == null) {
- result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
- }
- return result;
- }
-
- @Override
- public void output(OutputT value) {
- if (windowedValue != null) {
- outputWithTimestamp(value, windowedValue.getTimestamp());
- } else {
- outputWithTimestamp(value, null);
- }
- }
-
- @Override
- public void outputWithTimestamp(OutputT value, Instant timestamp) {
- if (windowedValue == null) {
- // we are in startBundle() or finishBundle()
-
- try {
- Collection windows = windowingStrategy.getWindowFn().assignWindows(
- new FlinkNoElementAssignContext(
- windowingStrategy.getWindowFn(),
- value,
- timestamp));
-
- collector.collect(
- WindowedValue.of(
- value,
- timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
- windows,
- PaneInfo.NO_FIRING));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- } else {
- collector.collect(
- WindowedValue.of(
- value,
- timestamp,
- windowedValue.getWindows(),
- windowedValue.getPane()));
- }
- }
-
- protected void outputWithTimestampAndWindow(
- OutputT value,
- Instant timestamp,
- Collection<? extends BoundedWindow> windows,
- PaneInfo pane) {
- collector.collect(
- WindowedValue.of(
- value, timestamp, windows, pane));
- }
-
- @Override
- public <T> void sideOutput(TupleTag<T> tag, T output) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) {
- sideOutput(tag, output);
- }
-
- @Override
- protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
- createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
- SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
- new SerializableFnAggregatorWrapper<>(combiner);
- Accumulator<?, ?> existingAccum =
- (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
- if (existingAccum != null) {
- return wrapper;
- } else {
- runtimeContext.addAccumulator(name, wrapper);
- }
- return wrapper;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
new file mode 100644
index 0000000..b814015
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.StateInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * {@link OldDoFn.ProcessContext} for our Flink Wrappers.
+ */
+abstract class FlinkProcessContextBase<InputT, OutputT>
+ extends OldDoFn<InputT, OutputT>.ProcessContext {
+
+ private final PipelineOptions pipelineOptions;
+ private final RuntimeContext runtimeContext;
+ private final boolean requiresWindowAccess;
+ protected final WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy;
+ private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs;
+
+ protected WindowedValue<InputT> windowedValue;
+
+ FlinkProcessContextBase(
+ PipelineOptions pipelineOptions,
+ RuntimeContext runtimeContext,
+ OldDoFn<InputT, OutputT> doFn,
+ WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ? extends BoundedWindow>> sideInputs) {
+ doFn.super();
+ checkNotNull(pipelineOptions);
+ checkNotNull(runtimeContext);
+ checkNotNull(doFn);
+
+ this.pipelineOptions = pipelineOptions;
+ this.runtimeContext = runtimeContext;
+ this.requiresWindowAccess = doFn instanceof OldDoFn.RequiresWindowAccess;
+ this.windowingStrategy = windowingStrategy;
+ this.sideInputs = sideInputs;
+
+ super.setupDelegateAggregators();
+ }
+
+ public void setWindowedValue(WindowedValue<InputT> windowedValue) {
+ this.windowedValue = windowedValue;
+ }
+
+ @Override
+ public InputT element() {
+ return this.windowedValue.getValue();
+ }
+
+
+ @Override
+ public Instant timestamp() {
+ return windowedValue.getTimestamp();
+ }
+
+ @Override
+ public BoundedWindow window() {
+ if (!requiresWindowAccess) {
+ throw new UnsupportedOperationException(
+ "window() is only available in the context of a OldDoFn marked as RequiresWindowAccess.");
+ }
+ return Iterables.getOnlyElement(windowedValue.getWindows());
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return windowedValue.getPane();
+ }
+
+ @Override
+ public WindowingInternals<InputT, OutputT> windowingInternals() {
+
+ return new WindowingInternals<InputT, OutputT>() {
+
+ @Override
+ public StateInternals stateInternals() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void outputWindowedValue(
+ OutputT value,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ outputWithTimestampAndWindow(value, timestamp, windows, pane);
+ }
+
+ @Override
+ public <SideOutputT> void sideOutputWindowedValue(
+ TupleTag<SideOutputT> tag,
+ SideOutputT output,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TimerInternals timerInternals() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<? extends BoundedWindow> windows() {
+ return windowedValue.getWindows();
+ }
+
+ @Override
+ public PaneInfo pane() {
+ return windowedValue.getPane();
+ }
+
+ @Override
+ public <T> void writePCollectionViewData(TupleTag<?> tag,
+ Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <ViewT> ViewT sideInput(
+ PCollectionView<ViewT> view,
+ BoundedWindow mainInputWindow) {
+
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ checkNotNull(
+ sideInputs.get(view),
+ "Side input for " + view + " not available.");
+
+ // get the side input strategy for mapping the window
+ WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
+
+ BoundedWindow sideInputWindow =
+ windowingStrategy.getWindowFn().getSideInputWindow(mainInputWindow);
+
+ Map<BoundedWindow, ViewT> sideInputs =
+ runtimeContext.getBroadcastVariableWithInitializer(
+ view.getTagInternal().getId(), new SideInputInitializer<>(view));
+ return sideInputs.get(sideInputWindow);
+ }
+ };
+ }
+
+ @Override
+ public PipelineOptions getPipelineOptions() {
+ return pipelineOptions;
+ }
+
+ @Override
+ public <ViewT> ViewT sideInput(PCollectionView<ViewT> view) {
+ checkNotNull(view, "View passed to sideInput cannot be null");
+ checkNotNull(sideInputs.get(view), "Side input for " + view + " not available.");
+ Iterator<? extends BoundedWindow> windowIter = windowedValue.getWindows().iterator();
+ BoundedWindow window;
+ if (!windowIter.hasNext()) {
+ throw new IllegalStateException(
+ "sideInput called when main input element is not in any windows");
+ } else {
+ window = windowIter.next();
+ if (windowIter.hasNext()) {
+ throw new IllegalStateException(
+ "sideInput called when main input element is in multiple windows");
+ }
+ }
+
+ // get the side input strategy for mapping the window
+ WindowingStrategy<?, ?> windowingStrategy = sideInputs.get(view);
+
+ BoundedWindow sideInputWindow =
+ windowingStrategy.getWindowFn().getSideInputWindow(window);
+
+ Map<BoundedWindow, ViewT> sideInputs =
+ runtimeContext.getBroadcastVariableWithInitializer(
+ view.getTagInternal().getId(), new SideInputInitializer<>(view));
+ ViewT result = sideInputs.get(sideInputWindow);
+ if (result == null) {
+ result = view.getViewFn().apply(Collections.<WindowedValue<?>>emptyList());
+ }
+ return result;
+ }
+
+ @Override
+ public void output(OutputT value) {
+ if (windowedValue != null) {
+ outputWithTimestamp(value, windowedValue.getTimestamp());
+ } else {
+ outputWithTimestamp(value, null);
+ }
+ }
+
+ @Override
+ public final void outputWithTimestamp(OutputT value, Instant timestamp) {
+ if (windowedValue == null) {
+ // we are in startBundle() or finishBundle()
+
+ try {
+ Collection windows = windowingStrategy.getWindowFn().assignWindows(
+ new FlinkNoElementAssignContext(
+ windowingStrategy.getWindowFn(),
+ value,
+ timestamp));
+
+ outputWithTimestampAndWindow(
+ value,
+ timestamp != null ? timestamp : new Instant(Long.MIN_VALUE),
+ windows,
+ PaneInfo.NO_FIRING);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ outputWithTimestampAndWindow(
+ value, timestamp, windowedValue.getWindows(), windowedValue.getPane());
+ }
+ }
+
+ protected abstract void outputWithTimestampAndWindow(
+ OutputT value,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane);
+
+ @Override
+ public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+
+ @Override
+ public abstract <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp);
+
+ @Override
+ protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+ createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) {
+ SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper =
+ new SerializableFnAggregatorWrapper<>(combiner);
+ Accumulator<?, ?> existingAccum =
+ (Accumulator<AggInputT, Serializable>) runtimeContext.getAccumulator(name);
+ if (existingAccum != null) {
+ return wrapper;
+ } else {
+ runtimeContext.addAccumulator(name, wrapper);
+ }
+ return wrapper;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
index c9b24b4..ab0c471 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java
@@ -92,14 +92,14 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
Iterable<WindowedValue<KV<K, AccumT>>> elements,
Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception {
- FlinkProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
- new FlinkProcessContext<>(
+ FlinkSingleOutputProcessContext<KV<K, AccumT>, KV<K, OutputT>> processContext =
+ new FlinkSingleOutputProcessContext<>(
serializedOptions.getPipelineOptions(),
getRuntimeContext(),
doFn,
windowingStrategy,
- out,
- sideInputs);
+ sideInputs, out
+ );
PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner =
PerKeyCombineFnRunners.create(combineFn);
@@ -150,14 +150,14 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
if (nextWindow.equals(currentWindow)) {
// continue accumulating
- processContext = processContext.forWindowedValue(nextValue);
+ processContext.setWindowedValue(nextValue);
accumulator = combineFnRunner.mergeAccumulators(
key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), processContext);
windowTimestamps.add(nextValue.getTimestamp());
} else {
// emit the value that we currently have
- processContext = processContext.forWindowedValue(currentValue);
+ processContext.setWindowedValue(currentValue);
out.collect(
WindowedValue.of(
KV.of(key, combineFnRunner.extractOutput(key, accumulator, processContext)),
@@ -179,7 +179,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow>
// if at the end of the iteration we have a change in windows
// the ProcessContext will not have been updated
- processContext = processContext.forWindowedValue(currentValue);
+ processContext.setWindowedValue(currentValue);
// emit the final accumulator
out.collect(
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fb1f7be/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
new file mode 100644
index 0000000..d67f6fd
--- /dev/null
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.flink.translation.functions;
+
+import java.util.Collection;
+import java.util.Map;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.join.RawUnionValue;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/** {@link OldDoFn.ProcessContext} for {@link FlinkDoFnFunction} with a single main output. */
+class FlinkSingleOutputProcessContext<InputT, OutputT>
+ extends FlinkProcessContextBase<InputT, OutputT> {
+
+ private final Collector<WindowedValue<OutputT>> collector;
+
+ FlinkSingleOutputProcessContext(
+ PipelineOptions pipelineOptions,
+ RuntimeContext runtimeContext,
+ OldDoFn<InputT, OutputT> doFn,
+ WindowingStrategy<?, ?> windowingStrategy,
+ Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs,
+ Collector<WindowedValue<OutputT>> collector) {
+ super(pipelineOptions, runtimeContext, doFn, windowingStrategy, sideInputs);
+ this.collector = collector;
+ }
+
+ @Override
+ protected void outputWithTimestampAndWindow(
+ OutputT value,
+ Instant timestamp,
+ Collection<? extends BoundedWindow> windows,
+ PaneInfo pane) {
+ collector.collect(WindowedValue.of(value, timestamp, windows, pane));
+ }
+
+ @Override
+ public <T> void sideOutput(TupleTag<T> tag, T value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T value, Instant timestamp) {
+ throw new UnsupportedOperationException();
+ }
+}