You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by jb...@apache.org on 2017/07/24 13:55:24 UTC

[3/4] beam git commit: [BEAM-2571] Respect watermark contract in Flink DoFnOperator

[BEAM-2571] Respect watermark contract in Flink DoFnOperator

In Flink, a watermark T specifies that there will be no elements with a
timestamp <= T in the future. In Beam, a watermark T specifies that
there will not be element with a timestamp < T in the future. This leads
to problems when the watermark is exactly "on the timer timestamp", most
prominently, this happened with Triggers, where Flink would fire the
Trigger too early and the Trigger would determine (based on the
watermark) that it is not yet time to fire the window while Flink
thought it was time.

This also adds a test that specifially tests the edge case.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/ade50652
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/ade50652
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/ade50652

Branch: refs/heads/release-2.1.0
Commit: ade506526b4ff56eb4ed15e9eea04d1d3345bc13
Parents: 5c4a95a
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Jul 12 15:38:06 2017 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Jul 24 14:29:56 2017 +0200

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        |  13 ++-
 .../flink/streaming/DoFnOperatorTest.java       | 117 ++++++++++++++++++-
 2 files changed, 128 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/ade50652/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 8da8de5..8884ce1 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -472,7 +472,7 @@ public class DoFnOperator<InputT, OutputT>
       // hold back by the pushed back values waiting for side inputs
       long pushedBackInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
 
-      timerService.advanceWatermark(pushedBackInputWatermark);
+      timerService.advanceWatermark(toFlinkRuntimeWatermark(pushedBackInputWatermark));
 
       Instant watermarkHold = stateInternals.watermarkHold();
 
@@ -501,6 +501,17 @@ public class DoFnOperator<InputT, OutputT>
   }
 
   /**
+   * Converts a Beam watermark to a Flink watermark. This is only relevant when considering what
+   * event-time timers to fire: in Beam, a watermark {@code T} says there will not be any elements
+   * with a timestamp {@code < T} in the future. A Flink watermark {@code T} says there will not be
+   * any elements with a timestamp {@code <= T} in the future. We correct this by subtracting
+   * {@code 1} from a Beam watermark before passing to any relevant Flink runtime components.
+   */
+  private static long toFlinkRuntimeWatermark(long beamWatermark) {
+    return beamWatermark - 1;
+  }
+
+  /**
    * Emits all pushed-back data. This should be used once we know that there will not be
    * any future side input, i.e. that there is no point in waiting.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/ade50652/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
index ad9d236..4d2a912 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java
@@ -33,6 +33,7 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.core.StatefulDoFnRunner;
 import org.apache.beam.runners.flink.FlinkPipelineOptions;
 import org.apache.beam.runners.flink.translation.types.CoderTypeInformation;
+import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer;
 import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
@@ -197,6 +198,118 @@ public class DoFnOperatorTest {
     testHarness.close();
   }
 
+  /**
+   * This test specifically verifies that we correctly map Flink watermarks to Beam watermarks. In
+   * Beam, a watermark {@code T} guarantees there will not be elements with a timestamp
+   * {@code < T} in the future. In Flink, a watermark {@code T} guarantees there will not be
+   * elements with a timestamp {@code <= T} in the future. We have to make sure to take this into
+   * account when firing timers.
+   *
+   * <p>This not test the timer API in general or processing-time timers because there are generic
+   * tests for this in {@code ParDoTest}.
+   */
+  @Test
+  public void testWatermarkContract() throws Exception {
+
+    final Instant timerTimestamp = new Instant(1000);
+    final String outputMessage = "Timer fired";
+
+    WindowingStrategy<Object, IntervalWindow> windowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(new Duration(10_000)));
+
+    DoFn<Integer, String> fn = new DoFn<Integer, String>() {
+      private static final String EVENT_TIMER_ID = "eventTimer";
+
+      @TimerId(EVENT_TIMER_ID)
+      private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+      @ProcessElement
+      public void processElement(ProcessContext context, @TimerId(EVENT_TIMER_ID) Timer timer) {
+        timer.set(timerTimestamp);
+      }
+
+      @OnTimer(EVENT_TIMER_ID)
+      public void onEventTime(OnTimerContext context) {
+        assertEquals(
+            "Timer timestamp must match set timestamp.", timerTimestamp, context.timestamp());
+        context.outputWithTimestamp(outputMessage, context.timestamp());
+      }
+    };
+
+    WindowedValue.FullWindowedValueCoder<Integer> inputCoder =
+        WindowedValue.getFullCoder(
+            VarIntCoder.of(),
+            windowingStrategy.getWindowFn().windowCoder());
+
+    WindowedValue.FullWindowedValueCoder<String> outputCoder =
+        WindowedValue.getFullCoder(
+            StringUtf8Coder.of(),
+            windowingStrategy.getWindowFn().windowCoder());
+
+
+    TupleTag<String> outputTag = new TupleTag<>("main-output");
+
+    DoFnOperator<Integer, String> doFnOperator = new DoFnOperator<>(
+        fn,
+        "stepName",
+        inputCoder,
+        outputTag,
+        Collections.<TupleTag<?>>emptyList(),
+        new DoFnOperator.DefaultOutputManagerFactory<String>(),
+        windowingStrategy,
+        new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */
+        Collections.<PCollectionView<?>>emptyList(), /* side inputs */
+        PipelineOptionsFactory.as(FlinkPipelineOptions.class),
+        VarIntCoder.of() /* key coder */);
+
+    OneInputStreamOperatorTestHarness<WindowedValue<Integer>, WindowedValue<String>> testHarness =
+        new KeyedOneInputStreamOperatorTestHarness<>(
+            doFnOperator,
+            new KeySelector<WindowedValue<Integer>, Integer>() {
+              @Override
+              public Integer getKey(WindowedValue<Integer> integerWindowedValue) throws Exception {
+                return integerWindowedValue.getValue();
+              }
+            },
+            new CoderTypeInformation<>(VarIntCoder.of()));
+
+    testHarness.setup(new CoderTypeSerializer<>(outputCoder));
+
+    testHarness.open();
+
+    testHarness.processWatermark(0);
+
+    IntervalWindow window1 = new IntervalWindow(new Instant(0), Duration.millis(10_000));
+
+    // this should register a timer
+    testHarness.processElement(
+        new StreamRecord<>(WindowedValue.of(13, new Instant(0), window1, PaneInfo.NO_FIRING)));
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        emptyIterable());
+
+    // this does not yet fire the timer (in vanilla Flink it would)
+    testHarness.processWatermark(timerTimestamp.getMillis());
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        emptyIterable());
+
+    testHarness.getOutput().clear();
+
+    // this must fire the timer
+    testHarness.processWatermark(timerTimestamp.getMillis() + 1);
+
+    assertThat(
+        this.<String>stripStreamRecordFromWindowedValue(testHarness.getOutput()),
+        contains(
+            WindowedValue.of(
+                outputMessage, new Instant(timerTimestamp), window1, PaneInfo.NO_FIRING)));
+
+    testHarness.close();
+  }
+
 
   @Test
   public void testLateDroppingForStatefulFn() throws Exception {
@@ -394,11 +507,13 @@ public class DoFnOperatorTest {
 
     // this should trigger both the window.maxTimestamp() timer and the GC timer
     // this tests that the GC timer fires after the user timer
+    // we have to add 1 here because Flink timers fire when watermark >= timestamp while Beam
+    // timers fire when watermark > timestamp
     testHarness.processWatermark(
         window1.maxTimestamp()
             .plus(windowingStrategy.getAllowedLateness())
             .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
-            .getMillis());
+            .getMillis() + 1);
 
     assertThat(
         this.<KV<String, Integer>>stripStreamRecordFromWindowedValue(testHarness.getOutput()),