You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/04 18:10:36 UTC
[4/6] beam git commit: [BEAM-1726] Fix empty side inputs in Flink
Streaming Runner
[BEAM-1726] Fix empty side inputs in Flink Streaming Runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5555040d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5555040d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5555040d
Branch: refs/heads/master
Commit: 5555040d935c67f5cd48f2ffe2721a07fe6e0a50
Parents: 95ade45
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Mar 18 12:16:06 2017 +0100
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300
----------------------------------------------------------------------
.../beam/runners/core/SideInputHandler.java | 10 ++++----
.../wrappers/streaming/DoFnOperator.java | 27 +++++++++++++++++++-
2 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 5c67148..b29f9d0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -161,11 +162,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
@Override
public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
- if (!isReady(sideInput, window)) {
- throw new IllegalStateException(
- "Side input " + sideInput + " is not ready for window " + window);
- }
-
@SuppressWarnings("unchecked")
Coder<BoundedWindow> windowCoder =
(Coder<BoundedWindow>) sideInput
@@ -181,6 +177,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
Iterable<WindowedValue<?>> elements = state.read();
+ if (elements == null) {
+ elements = Collections.emptyList();
+ }
+
return sideInput.getViewFn().apply(elements);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c624036..16bf5d2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -463,7 +463,32 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public void processWatermark2(Watermark mark) throws Exception {
- // ignore watermarks from the side-input input
+ if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+ // this means we will never see any more side input
+ pushbackDoFnRunner.startBundle();
+
+ BagState<WindowedValue<InputT>> pushedBack =
+ pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+ Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+ if (pushedBackContents != null) {
+ for (WindowedValue<InputT> elem : pushedBackContents) {
+
+ // we need to set the correct key in case the operator is
+ // a (keyed) window operator
+ setKeyContextElement1(new StreamRecord<>(elem));
+
+ doFnRunner.processElement(elem);
+ }
+ }
+
+ setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+ pushbackDoFnRunner.finishBundle();
+
+ // maybe output a new watermark
+ processWatermark1(new Watermark(currentInputWatermark));
+ }
}
@Override