You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/30 15:58:46 UTC
[1/3] beam git commit: [BEAM-2380] Forward additional outputs to
DoFnRunner in Flink Batch
Repository: beam
Updated Branches:
refs/heads/master 2cb4b03de -> 711faffef
[BEAM-2380] Forward additional outputs to DoFnRunner in Flink Batch
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9afe395b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9afe395b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9afe395b
Branch: refs/heads/master
Commit: 9afe395bbddd2382c5222dd3145a0be3cdf7077a
Parents: 2cb4b03
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 30 10:56:56 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 30 10:57:40 2017 +0200
----------------------------------------------------------------------
.../flink/translation/functions/FlinkDoFnFunction.java | 8 +++++---
.../translation/functions/FlinkStatefulDoFnFunction.java | 8 +++++---
2 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/9afe395b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 9205bce..42a8833 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -17,7 +17,8 @@
*/
package org.apache.beam.runners.flink.translation.functions;
-import java.util.Collections;
+import com.google.common.collect.Lists;
+import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
@@ -97,13 +98,14 @@ public class FlinkDoFnFunction<InputT, OutputT>
new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
}
+ List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
+
DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
serializedOptions.getPipelineOptions(), doFn,
new FlinkSideInputReader(sideInputs, runtimeContext),
outputManager,
mainOutputTag,
- // see SimpleDoFnRunner, just use it to limit number of additional outputs
- Collections.<TupleTag<?>>emptyList(),
+ additionalOutputTags,
new FlinkNoOpStepContext(),
windowingStrategy);
http://git-wip-us.apache.org/repos/asf/beam/blob/9afe395b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 6517bf2..b075768 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -19,8 +19,9 @@ package org.apache.beam.runners.flink.translation.functions;
import static org.apache.flink.util.Preconditions.checkArgument;
-import java.util.Collections;
+import com.google.common.collect.Lists;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import org.apache.beam.runners.core.DoFnRunner;
import org.apache.beam.runners.core.DoFnRunners;
@@ -114,13 +115,14 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
timerInternals.advanceProcessingTime(Instant.now());
timerInternals.advanceSynchronizedProcessingTime(Instant.now());
+ List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
+
DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
serializedOptions.getPipelineOptions(), dofn,
new FlinkSideInputReader(sideInputs, runtimeContext),
outputManager,
mainOutputTag,
- // see SimpleDoFnRunner, just use it to limit number of additional outputs
- Collections.<TupleTag<?>>emptyList(),
+ additionalOutputTags,
new FlinkNoOpStepContext() {
@Override
public StateInternals stateInternals() {
[3/3] beam git commit: This closes #3258
Posted by dh...@apache.org.
This closes #3258
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/711faffe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/711faffe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/711faffe
Branch: refs/heads/master
Commit: 711faffefeb91052efc0e3f45f7c9fb42df05151
Parents: 2cb4b03 838035a
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 30 08:58:36 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 30 08:58:36 2017 -0700
----------------------------------------------------------------------
.../functions/FlinkDoFnFunction.java | 8 +-
.../functions/FlinkStatefulDoFnFunction.java | 8 +-
.../wrappers/streaming/DoFnOperator.java | 77 +++++++++++++++-----
3 files changed, 70 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
[2/3] beam git commit: Fix flushing of pushed-back data in Flink
Runner on +Inf watermark
Posted by dh...@apache.org.
Fix flushing of pushed-back data in Flink Runner on +Inf watermark
Before, we only flushed all pushed-back data when receiving a +Inf
watermark on the side input. It can happen that we receive that
watermark before getting any data on the main input. This changes
DoFnOperator to also flush when receiving a main-input watermark and we
happen to have already received the +Inf watermark on the side input.
Some tests where Flaky because of this. One example was
ViewTest.testWindowedSideInputFixedToFixedWithDefault().
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/838035a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/838035a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/838035a4
Branch: refs/heads/master
Commit: 838035a4069b152143859e9b34570b15254d69b3
Parents: 9afe395
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 30 15:19:27 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 30 15:23:55 2017 +0200
----------------------------------------------------------------------
.../wrappers/streaming/DoFnOperator.java | 77 +++++++++++++++-----
1 file changed, 60 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/838035a4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index d2ab7e1..e473046 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming;
import static org.apache.flink.util.Preconditions.checkArgument;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Iterables;
import java.io.DataInputStream;
@@ -129,6 +130,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
protected transient long currentInputWatermark;
+ protected transient long currentSideInputWatermark;
+
protected transient long currentOutputWatermark;
private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
@@ -197,6 +200,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
super.open();
setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
sideInputReader = NullSideInputReader.of(sideInputs);
@@ -308,6 +312,21 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public void close() throws Exception {
super.close();
+
+ // sanity check: these should have been flushed out by +Inf watermarks
+ if (pushbackStateInternals != null) {
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+ if (pushedBackContents != null) {
+ if (!Iterables.isEmpty(pushedBackContents)) {
+ String pushedBackString = Joiner.on(",").join(pushedBackContents);
+ throw new RuntimeException(
+ "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug.");
+ }
+ }
+ }
doFnInvoker.invokeTeardown();
}
@@ -457,36 +476,56 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
}
pushbackDoFnRunner.finishBundle();
}
+
+ // We do the check here because we are guaranteed to at least get the +Inf watermark on the
+ // main input when the job finishes.
+ if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ // this means we will never see any more side input
+ // we also do the check here because we might have received the side-input MAX watermark
+ // before receiving any main-input data
+ emitAllPushedBackData();
+ }
}
@Override
public void processWatermark2(Watermark mark) throws Exception {
- if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ setCurrentSideInputWatermark(mark.getTimestamp());
+ if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
// this means we will never see any more side input
- pushbackDoFnRunner.startBundle();
+ emitAllPushedBackData();
- BagState<WindowedValue<InputT>> pushedBack =
- pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+ // maybe output a new watermark
+ processWatermark1(new Watermark(currentInputWatermark));
+ }
+ }
- Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
- if (pushedBackContents != null) {
- for (WindowedValue<InputT> elem : pushedBackContents) {
+ /**
+ * Emits all pushed-back data. This should be used once we know that there will not be
+ * any future side input, i.e. that there is no point in waiting.
+ */
+ private void emitAllPushedBackData() throws Exception {
+ pushbackDoFnRunner.startBundle();
- // we need to set the correct key in case the operator is
- // a (keyed) window operator
- setKeyContextElement1(new StreamRecord<>(elem));
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
- doFnRunner.processElement(elem);
- }
+ Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+ if (pushedBackContents != null) {
+ for (WindowedValue<InputT> elem : pushedBackContents) {
+
+ // we need to set the correct key in case the operator is
+ // a (keyed) window operator
+ setKeyContextElement1(new StreamRecord<>(elem));
+
+ doFnRunner.processElement(elem);
}
+ }
- setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+ pushedBack.clear();
- pushbackDoFnRunner.finishBundle();
+ setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
- // maybe output a new watermark
- processWatermark1(new Watermark(currentInputWatermark));
- }
+ pushbackDoFnRunner.finishBundle();
}
@Override
@@ -610,6 +649,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
this.currentInputWatermark = currentInputWatermark;
}
+ private void setCurrentSideInputWatermark(long currentInputWatermark) {
+ this.currentSideInputWatermark = currentInputWatermark;
+ }
+
private void setCurrentOutputWatermark(long currentOutputWatermark) {
this.currentOutputWatermark = currentOutputWatermark;
}