You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by av...@apache.org on 2017/05/04 18:10:36 UTC

[4/6] beam git commit: [BEAM-1726] Fix empty side inputs in Flink Streaming Runner

[BEAM-1726] Fix empty side inputs in Flink Streaming Runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5555040d
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5555040d
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5555040d

Branch: refs/heads/master
Commit: 5555040d935c67f5cd48f2ffe2721a07fe6e0a50
Parents: 95ade45
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Sat Mar 18 12:16:06 2017 +0100
Committer: Aviem Zur <av...@gmail.com>
Committed: Thu May 4 20:48:56 2017 +0300

----------------------------------------------------------------------
 .../beam/runners/core/SideInputHandler.java     | 10 ++++----
 .../wrappers/streaming/DoFnOperator.java        | 27 +++++++++++++++++++-
 2 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
index 5c67148..b29f9d0 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SideInputHandler.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.core;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -161,11 +162,6 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
   @Override
   public <T> T get(PCollectionView<T> sideInput, BoundedWindow window) {
 
-    if (!isReady(sideInput, window)) {
-      throw new IllegalStateException(
-          "Side input " + sideInput + " is not ready for window " + window);
-    }
-
     @SuppressWarnings("unchecked")
     Coder<BoundedWindow> windowCoder =
         (Coder<BoundedWindow>) sideInput
@@ -181,6 +177,10 @@ public class SideInputHandler implements ReadyCheckingSideInputReader {
 
     Iterable<WindowedValue<?>> elements = state.read();
 
+    if (elements == null) {
+      elements = Collections.emptyList();
+    }
+
     return sideInput.getViewFn().apply(elements);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5555040d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index c624036..16bf5d2 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -463,7 +463,32 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   @Override
   public void processWatermark2(Watermark mark) throws Exception {
-    // ignore watermarks from the side-input input
+    if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      // this means we will never see any more side input
+      pushbackDoFnRunner.startBundle();
+
+      BagState<WindowedValue<InputT>> pushedBack =
+          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+      if (pushedBackContents != null) {
+        for (WindowedValue<InputT> elem : pushedBackContents) {
+
+          // we need to set the correct key in case the operator is
+          // a (keyed) window operator
+          setKeyContextElement1(new StreamRecord<>(elem));
+
+          doFnRunner.processElement(elem);
+        }
+      }
+
+      setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+
+      pushbackDoFnRunner.finishBundle();
+
+      // maybe output a new watermark
+      processWatermark1(new Watermark(currentInputWatermark));
+    }
   }
 
   @Override