You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by dh...@apache.org on 2017/05/30 15:58:46 UTC

[1/3] beam git commit: [BEAM-2380] Forward additional outputs to DoFnRunner in Flink Batch

Repository: beam
Updated Branches:
  refs/heads/master 2cb4b03de -> 711faffef


[BEAM-2380] Forward additional outputs to DoFnRunner in Flink Batch


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9afe395b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9afe395b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9afe395b

Branch: refs/heads/master
Commit: 9afe395bbddd2382c5222dd3145a0be3cdf7077a
Parents: 2cb4b03
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 30 10:56:56 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 30 10:57:40 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/functions/FlinkDoFnFunction.java       | 8 +++++---
 .../translation/functions/FlinkStatefulDoFnFunction.java     | 8 +++++---
 2 files changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9afe395b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
index 9205bce..42a8833 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java
@@ -17,7 +17,8 @@
  */
 package org.apache.beam.runners.flink.translation.functions;
 
-import java.util.Collections;
+import com.google.common.collect.Lists;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -97,13 +98,14 @@ public class FlinkDoFnFunction<InputT, OutputT>
           new FlinkDoFnFunction.MultiDoFnOutputManager((Collector) out, outputMap);
     }
 
+    List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
+
     DoFnRunner<InputT, OutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(), doFn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
         outputManager,
         mainOutputTag,
-        // see SimpleDoFnRunner, just use it to limit number of additional outputs
-        Collections.<TupleTag<?>>emptyList(),
+        additionalOutputTags,
         new FlinkNoOpStepContext(),
         windowingStrategy);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9afe395b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
index 6517bf2..b075768 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java
@@ -19,8 +19,9 @@ package org.apache.beam.runners.flink.translation.functions;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
-import java.util.Collections;
+import com.google.common.collect.Lists;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import org.apache.beam.runners.core.DoFnRunner;
 import org.apache.beam.runners.core.DoFnRunners;
@@ -114,13 +115,14 @@ public class FlinkStatefulDoFnFunction<K, V, OutputT>
     timerInternals.advanceProcessingTime(Instant.now());
     timerInternals.advanceSynchronizedProcessingTime(Instant.now());
 
+    List<TupleTag<?>> additionalOutputTags = Lists.newArrayList(outputMap.keySet());
+
     DoFnRunner<KV<K, V>, OutputT> doFnRunner = DoFnRunners.simpleRunner(
         serializedOptions.getPipelineOptions(), dofn,
         new FlinkSideInputReader(sideInputs, runtimeContext),
         outputManager,
         mainOutputTag,
-        // see SimpleDoFnRunner, just use it to limit number of additional outputs
-        Collections.<TupleTag<?>>emptyList(),
+        additionalOutputTags,
         new FlinkNoOpStepContext() {
           @Override
           public StateInternals stateInternals() {


[3/3] beam git commit: This closes #3258

Posted by dh...@apache.org.
This closes #3258


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/711faffe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/711faffe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/711faffe

Branch: refs/heads/master
Commit: 711faffefeb91052efc0e3f45f7c9fb42df05151
Parents: 2cb4b03 838035a
Author: Dan Halperin <dh...@google.com>
Authored: Tue May 30 08:58:36 2017 -0700
Committer: Dan Halperin <dh...@google.com>
Committed: Tue May 30 08:58:36 2017 -0700

----------------------------------------------------------------------
 .../functions/FlinkDoFnFunction.java            |  8 +-
 .../functions/FlinkStatefulDoFnFunction.java    |  8 +-
 .../wrappers/streaming/DoFnOperator.java        | 77 +++++++++++++++-----
 3 files changed, 70 insertions(+), 23 deletions(-)
----------------------------------------------------------------------



[2/3] beam git commit: Fix flushing of pushed-back data in Flink Runner on +Inf watermark

Posted by dh...@apache.org.
Fix flushing of pushed-back data in Flink Runner on +Inf watermark

Before, we only flushed all pushed-back data when receiving a +Inf
watermark on the side input. It can happen that we receive that
watermark before getting any data on the main input. This changes
DoFnOperator to also flush when receiving a main-input watermark and we
happen to have already received the +Inf watermark on the side input.

Some tests where Flaky because of this. One example was
ViewTest.testWindowedSideInputFixedToFixedWithDefault().


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/838035a4
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/838035a4
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/838035a4

Branch: refs/heads/master
Commit: 838035a4069b152143859e9b34570b15254d69b3
Parents: 9afe395
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Tue May 30 15:19:27 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue May 30 15:23:55 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        | 77 +++++++++++++++-----
 1 file changed, 60 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/838035a4/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index d2ab7e1..e473046 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.flink.translation.wrappers.streaming;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.collect.Iterables;
 import java.io.DataInputStream;
@@ -129,6 +130,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
 
   protected transient long currentInputWatermark;
 
+  protected transient long currentSideInputWatermark;
+
   protected transient long currentOutputWatermark;
 
   private transient StateTag<BagState<WindowedValue<InputT>>> pushedBackTag;
@@ -197,6 +200,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     super.open();
 
     setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    setCurrentSideInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
     setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
 
     sideInputReader = NullSideInputReader.of(sideInputs);
@@ -308,6 +312,21 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   @Override
   public void close() throws Exception {
     super.close();
+
+    // sanity check: these should have been flushed out by +Inf watermarks
+    if (pushbackStateInternals != null) {
+      BagState<WindowedValue<InputT>> pushedBack =
+          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+
+      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+      if (pushedBackContents != null) {
+        if (!Iterables.isEmpty(pushedBackContents)) {
+          String pushedBackString = Joiner.on(",").join(pushedBackContents);
+          throw new RuntimeException(
+              "Leftover pushed-back data: " + pushedBackString + ". This indicates a bug.");
+        }
+      }
+    }
     doFnInvoker.invokeTeardown();
   }
 
@@ -457,36 +476,56 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       }
       pushbackDoFnRunner.finishBundle();
     }
+
+    // We do the check here because we are guaranteed to at least get the +Inf watermark on the
+    // main input when the job finishes.
+    if (currentSideInputWatermark >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+      // this means we will never see any more side input
+      // we also do the check here because we might have received the side-input MAX watermark
+      // before receiving any main-input data
+      emitAllPushedBackData();
+    }
   }
 
   @Override
   public void processWatermark2(Watermark mark) throws Exception {
-    if (mark.getTimestamp() == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
+    setCurrentSideInputWatermark(mark.getTimestamp());
+    if (mark.getTimestamp() >= BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
       // this means we will never see any more side input
-      pushbackDoFnRunner.startBundle();
+      emitAllPushedBackData();
 
-      BagState<WindowedValue<InputT>> pushedBack =
-          pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
+      // maybe output a new watermark
+      processWatermark1(new Watermark(currentInputWatermark));
+    }
+  }
 
-      Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
-      if (pushedBackContents != null) {
-        for (WindowedValue<InputT> elem : pushedBackContents) {
+  /**
+   * Emits all pushed-back data. This should be used once we know that there will not be
+   * any future side input, i.e. that there is no point in waiting.
+   */
+  private void emitAllPushedBackData() throws Exception {
+    pushbackDoFnRunner.startBundle();
 
-          // we need to set the correct key in case the operator is
-          // a (keyed) window operator
-          setKeyContextElement1(new StreamRecord<>(elem));
+    BagState<WindowedValue<InputT>> pushedBack =
+        pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
-          doFnRunner.processElement(elem);
-        }
+    Iterable<WindowedValue<InputT>> pushedBackContents = pushedBack.read();
+    if (pushedBackContents != null) {
+      for (WindowedValue<InputT> elem : pushedBackContents) {
+
+        // we need to set the correct key in case the operator is
+        // a (keyed) window operator
+        setKeyContextElement1(new StreamRecord<>(elem));
+
+        doFnRunner.processElement(elem);
       }
+    }
 
-      setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    pushedBack.clear();
 
-      pushbackDoFnRunner.finishBundle();
+    setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
 
-      // maybe output a new watermark
-      processWatermark1(new Watermark(currentInputWatermark));
-    }
+    pushbackDoFnRunner.finishBundle();
   }
 
   @Override
@@ -610,6 +649,10 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
     this.currentInputWatermark = currentInputWatermark;
   }
 
+  private void setCurrentSideInputWatermark(long currentInputWatermark) {
+    this.currentSideInputWatermark = currentInputWatermark;
+  }
+
   private void setCurrentOutputWatermark(long currentOutputWatermark) {
     this.currentOutputWatermark = currentOutputWatermark;
   }