You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by al...@apache.org on 2017/07/24 08:04:17 UTC

[1/4] beam git commit: [BEAM-2571] Respect watermark contract in Flink DoFnOperator

Repository: beam
Updated Branches:
  refs/heads/master 5f1d1365b -> f54072a1b


[BEAM-2571] Respect watermark contract in Flink DoFnOperator

In Flink, a watermark T specifies that there will be no elements with a
timestamp <= T in the future. In Beam, a watermark T specifies that
there will not be element with a timestamp < T in the future. This leads
to problems when the watermark is exactly "on the timer timestamp", most
prominently, this happened with Triggers, where Flink would fire the
Trigger too early and the Trigger would determine (based on the
watermark) that it is not yet time to fire the window while Flink
thought it was time.

This also adds a test that specifially tests the edge case.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/18250878
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/18250878
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/18250878

Branch: refs/heads/master
Commit: 18250878552ede370e90b89e66f17664edaf2c7e
Parents: b03c4f0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jul 12 15:38:06 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jul 24 09:47:44 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        |  13 ++-
 .../flink/streaming/DoFnOperatorTest.java       | 117 ++++++++++++++++++-
 2 files changed, 128 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/18250878/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 3b234ac..7995ea8 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -472,7 +472,7 @@ public class DoFnOperator<InputT, OutputT>
       // hold back by the pushed back values waiting for side inputs
       long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
 
-      timerService.advanceWatermark(pushedBackInputWatermark);
+      timerService.advanceWatermark(toFlinkRuntimeWatermark(pushedBackInputWatermark));
 
       Instant watermarkHold = stateInternals.watermarkHold();
 
@@ -501,6 +501,17 @@ public class DoFnOperator<InputT, OutputT>
   }
 
   /**
+   * Converts a Beam watermark to a Flink watermark. This is only relevant when considering what
+   * event-time timers to fire: in Beam, a watermark {@code T} says there will not be any elements
+   * with a timestamp {@code < T} in the future. A Flink watermark {@code T} says there will not be
+   * any elements with a timestamp {@code <= T} in the future. We correct this by subtracting
+   * {@code 1} from a Beam watermark before passing to any relevant Flink runtime components.
+   */
+  private static long toFlinkRuntimeWatermark(long beamWatermark) {
+    return beamWatermark - 1;
+  }
+
+  /**
    * Emits all pushed-back data. This should be used once we know that there will not be
    * any future side input, i.e. that there is no point in waiting.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/18250878/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index ad9d236..4d2a912 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -197,6 +198,118 @@ public class DoFnOperatorTest {
     testHarness.close();
   }
 
+  /**
+   * This test specifically verifies that we correctly map Flink watermarks to Beam watermarks. In
+   * Beam, a watermark {@code T} guarantees there will not be elements with a timestamp
+   * {@code < T} in the future. In Flink, a watermark {@code T} guarantees there will not be
+   * elements with a timestamp {@code <= T} in the future. We have to make sure to take this into
+   * account when firing timers.
+   *
+   * <p>This not test the timer API in general or processing-time timers because there are generic
+   * tests for this in {@code ParDoTest}.
+   */
+  @Test
+  public void testWatermarkContract() throws Exception {
+
+    final Instant timerTimestamp = new Instant(1000);
+    final String outputMessage = "Timer fired";
+
+    WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(new Duration(10_000)));
+
+    DoFn<Integer, String> fn = new DoFn<Integer, String>() {
+      private static final String EVENT_TIMER_ID = "eventTimer";
+
+      @TimerId(EVENT_TIMER_ID)
+      private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+      @ProcessElement
+      public void processElement(ProcessContext context, @TimerId(EVENT_TIMER_ID) Timer timer) {
+        timer.set(timerTimestamp);
+      }
+
+      @OnTimer(EVENT_TIMER_ID)
+      public void onEventTime(OnTimerContext context) {
+        assertEquals(
+            "Timer timestamp must match set timestamp.", timerTimestamp, context.timestamp());
+        context.outputWithTimestamp(outputMessage, context.timestamp());
+      }
+    };
+
+    WindowedValue.FullWindowedValueCoder<Integer> inputCoder =
+        WindowedValue.getFullCoder(
+            VarIntCoder.of(),
+            windowingStrategy.getWindowFn().windowCoder());
+
+    WindowedValue.FullWindowedValueCoder<String> outputCoder =
+        WindowedValue.getFullCoder(
+            StringUtf8Coder.of(),
+            windowingStrategy.getWindowFn().windowCoder());
+
+
+    TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>(
+        fn,
+        "stepName",
+        inputCoder,
+        outputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        windowingStrategy,
+        new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+        Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        VarIntCoder.of() /* key coder */);
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness =
+        new KeyedOneInputStreamOperatorTestHarness<>(
+            doFnOperator,
+            new KeySelector<WindowedValue<Integer>, Integer>() {
+              @Override
+              public Integer getKey(WindowedValue<Integer> integerWindowedValue) throws Exception {
+                return integerWindowedValue.getValue();
+              }
+            },
+            new CoderTypeInformation<>(VarIntCoder.of()));
+
+    testHarness.setup(new CoderTypeSerializer<>(outputCoder));
+
+    testHarness.open();
+
+    testHarness.processWatermark(0);
+
+    IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10_000));
+
+    // this should register a timer
+    testHarness.processElement(
+        new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        emptyIterable());
+
+    // this does not yet fire the timer (in vanilla Flink it would)
+    testHarness.processWatermark(timerTimestamp.getMillis());
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        emptyIterable());
+
+    testHarness.getOutput().clear();
+
+    // this must fire the timer
+    testHarness.processWatermark(timerTimestamp.getMillis() + 1);
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(
+            WindowedValue.of(
+                outputMessage, new Instant(timerTimestamp), window1, PaneInfo.NO_FIRING)));
+
+    testHarness.close();
+  }
+
 
   @Test
   public void testLateDroppingForStatefulFn() throws Exception {
@@ -394,11 +507,13 @@ public class DoFnOperatorTest {
 
     // this should trigger both the window.maxTimestamp() timer and the GC timer
     // this tests that the GC timer fires after the user timer
+    // we have to add 1 here because Flink timers fire when watermark >= timestamp while Beam
+    // timers fire when watermark > timestamp
     testHarness.processWatermark(
         window1.maxTimestamp()
             .plus(windowingStrategy.getAllowedLateness())
             .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
-            .getMillis());
+            .getMillis() + 1);
 
     assertThat(
         this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),


[3/4] beam git commit: [BEAM-2571] Change DoFnOperator to use Long.MAX_VALUE as max watermark

Posted by al...@apache.org.
[BEAM-2571] Change DoFnOperator to use Long.MAX_VALUE as max watermark

This is in line with what Flink does and what BoundedSourceWrapper and
UnboundedSourceWrapper do.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/84499317
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/84499317
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/84499317

Branch: refs/heads/master
Commit: 8449931708338dd854ba90b8d6f769fe42b81493
Parents: 5f1d136
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jul 12 14:42:37 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jul 24 09:47:44 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/streaming/DoFnOperator.java   | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/84499317/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index a80f7b6..b1f3b86 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -334,7 +334,7 @@ public class DoFnOperator<InputT, OutputT>
   protected final long getPushbackWatermarkHold() {
     // if we don't have side inputs we never hold the watermark
     if (sideInputs.isEmpty()) {
-      return BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+      return Long.MAX_VALUE;
     }
 
     try {
@@ -353,7 +353,7 @@ public class DoFnOperator<InputT, OutputT>
       BagState<WindowedValue<InputT>> pushedBack =
           pushbackStateInternals.state(StateNamespaces.global(), pushedBackTag);
 
-      long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+      long min = Long.MAX_VALUE;
       for (WindowedValue<InputT> value : pushedBack.read()) {
         min = Math.min(min, value.getTimestamp().getMillis());
       }
@@ -426,7 +426,7 @@ public class DoFnOperator<InputT, OutputT>
     }
 
     pushedBack.clear();
-    long min = BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis();
+    long min = Long.MAX_VALUE;
     for (WindowedValue<InputT> pushedBackValue : newPushedBack) {
       min = Math.min(min, pushedBackValue.getTimestamp().getMillis());
       pushedBack.add(pushedBackValue);
@@ -524,7 +524,7 @@ public class DoFnOperator<InputT, OutputT>
 
     pushedBack.clear();
 
-    setPushedBackWatermark(BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis());
+    setPushedBackWatermark(Long.MAX_VALUE);
 
     pushbackDoFnRunner.finishBundle();
   }


[2/4] beam git commit: [BEAM-2571] Clarify pushedback variable name in DoFnOperator

Posted by al...@apache.org.
[BEAM-2571] Clarify pushedback variable name in DoFnOperator


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b03c4f07
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b03c4f07
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b03c4f07

Branch: refs/heads/master
Commit: b03c4f0790fa639d739f7f3fdeaa4a703fadb8fa
Parents: 8449931
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jul 12 14:39:58 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jul 24 09:47:44 2017 +0200

----------------------------------------------------------------------
 .../flink/translation/wrappers/streaming/DoFnOperator.java     | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b03c4f07/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index b1f3b86..3b234ac 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -470,15 +470,15 @@ public class DoFnOperator<InputT, OutputT>
       setCurrentInputWatermark(mark.getTimestamp());
 
       // hold back by the pushed back values waiting for side inputs
-      long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
+      long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
 
-      timerService.advanceWatermark(actualInputWatermark);
+      timerService.advanceWatermark(pushedBackInputWatermark);
 
       Instant watermarkHold = stateInternals.watermarkHold();
 
       long combinedWatermarkHold = Math.min(watermarkHold.getMillis(), getPushbackWatermarkHold());
 
-      long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
+      long potentialOutputWatermark = Math.min(pushedBackInputWatermark, combinedWatermarkHold);
 
       if (potentialOutputWatermark > currentOutputWatermark) {
         setCurrentOutputWatermark(potentialOutputWatermark);


[4/4] beam git commit: This closes #3600

Posted by al...@apache.org.
This closes #3600


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/f54072a1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/f54072a1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/f54072a1

Branch: refs/heads/master
Commit: f54072a1b156176624db5b666ed9c308f24bc2f9
Parents: 5f1d136 1825087
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Jul 24 10:03:39 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jul 24 10:03:39 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        |  25 ++--
 .../flink/streaming/DoFnOperatorTest.java       | 117 ++++++++++++++++++-
 2 files changed, 134 insertions(+), 8 deletions(-)
----------------------------------------------------------------------