You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 20:27:50 UTC

[1/3] beam git commit: Fix initial watermark of DoFnOperator in Flink runner

Repository: beam
Updated Branches:
  refs/heads/master e2aa8892e -> 9fffa7efa


Fix initial watermark of DoFnOperator in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7ece1647
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7ece1647
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7ece1647

Branch: refs/heads/master
Commit: 7ece1647d6aeab4544d994a3a00360919920a978
Parents: 4f93492
Author: JingsongLi <lz...@aliyun.com>
Authored: Tue May 2 17:36:46 2017 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 13:27:05 2017 -0700

----------------------------------------------------------------------
 .../wrappers/streaming/DoFnOperator.java        | 20 ++++++++++++++------
 1 file changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/7ece1647/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 16bf5d2..518d6be 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -198,8 +198,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   public void open() throws Exception {
     super.open();
 
-    currentInputWatermark = Long.MIN_VALUE;
-    currentOutputWatermark = Long.MIN_VALUE;
+    setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+    setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
 
     sideInputReader = NullSideInputReader.of(sideInputs);
 
@@ -429,18 +429,18 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
   @Override
   public void processWatermark1(Watermark mark) throws Exception {
     if (keyCoder == null) {
-      this.currentInputWatermark = mark.getTimestamp();
+      setCurrentInputWatermark(mark.getTimestamp());
       long potentialOutputWatermark =
           Math.min(getPushbackWatermarkHold(), currentInputWatermark);
       if (potentialOutputWatermark > currentOutputWatermark) {
-        currentOutputWatermark = potentialOutputWatermark;
+        setCurrentOutputWatermark(potentialOutputWatermark);
         output.emitWatermark(new Watermark(currentOutputWatermark));
       }
     } else {
       // fireTimers, so we need startBundle.
       pushbackDoFnRunner.startBundle();
 
-      this.currentInputWatermark = mark.getTimestamp();
+      setCurrentInputWatermark(mark.getTimestamp());
 
       // hold back by the pushed back values waiting for side inputs
       long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
@@ -454,7 +454,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
       long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
 
       if (potentialOutputWatermark > currentOutputWatermark) {
-        currentOutputWatermark = potentialOutputWatermark;
+        setCurrentOutputWatermark(potentialOutputWatermark);
         output.emitWatermark(new Watermark(currentOutputWatermark));
       }
       pushbackDoFnRunner.finishBundle();
@@ -608,6 +608,14 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
         timerData.getTimestamp(), timerData.getDomain());
   }
 
+  private void setCurrentInputWatermark(long currentInputWatermark) {
+    this.currentInputWatermark = currentInputWatermark;
+  }
+
+  private void setCurrentOutputWatermark(long currentOutputWatermark) {
+    this.currentOutputWatermark = currentOutputWatermark;
+  }
+
   /**
    * Factory for creating an {@link DoFnRunners.OutputManager} from
    * a Flink {@link Output}.


[2/3] beam git commit: [BEAM-1727] Add align and offset to Timer

Posted by ke...@apache.org.
[BEAM-1727] Add align and offset to Timer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f934923
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f934923
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f934923

Branch: refs/heads/master
Commit: 4f934923d28798dfe7cd18c86ff4bcf8eebc27e5
Parents: e2aa889
Author: JingsongLi <lz...@aliyun.com>
Authored: Mon Mar 20 12:12:31 2017 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 13:27:05 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformMatchersTest.java    |   2 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |  44 +++++++-
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   2 +-
 .../org/apache/beam/sdk/transforms/DoFn.java    |   2 +-
 .../java/org/apache/beam/sdk/util/Timer.java    |  27 +++--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 113 ++++++++++++++++++-
 6 files changed, 171 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index e7d4c64..bb1b1cd 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -193,7 +193,7 @@ public class PTransformMatchersTest implements Serializable {
 
         @ProcessElement
         public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
-          timer.setForNowPlus(Duration.standardSeconds(1));
+          timer.offset(Duration.standardSeconds(1)).setRelative();
           context.output(3);
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 8a3e25f..7f29a6f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -788,6 +788,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     private final StateNamespace namespace;
     private final String timerId;
     private final TimerSpec spec;
+    private Duration period = Duration.ZERO;
+    private Duration offset = Duration.ZERO;
 
     public TimerInternalsTimer(
         BoundedWindow window,
@@ -812,12 +814,45 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
     }
 
     @Override
-    public void setForNowPlus(Duration durationFromNow) {
-      Instant target = getCurrentTime().plus(durationFromNow);
-      verifyTargetTime(target);
+    public void setRelative() {
+      Instant target;
+      Instant now = getCurrentTime();
+      if (period.equals(Duration.ZERO)) {
+        target = now.plus(offset);
+      } else {
+        long millisSinceStart = now.plus(offset).getMillis() % period.getMillis();
+        target = millisSinceStart == 0 ? now : now.plus(period).minus(millisSinceStart);
+      }
+      target = minTargetAndGcTime(target);
       setUnderlyingTimer(target);
     }
 
+    @Override
+    public Timer offset(Duration offset) {
+      this.offset = offset;
+      return this;
+    }
+
+    @Override
+    public Timer align(Duration period) {
+      this.period = period;
+      return this;
+    }
+
+    /**
+     * For event time timers the target time should be prior to window GC time. So it return
+     * min(time to set, GC Time of window).
+     */
+    private Instant minTargetAndGcTime(Instant target) {
+      if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+        Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+        if (target.isAfter(windowExpiry)) {
+          return windowExpiry;
+        }
+      }
+      return target;
+    }
+
     /**
      * Ensures that the target time is reasonable. For event time timers this means that the
      * time should be prior to window GC time.
@@ -836,7 +871,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
       if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
         throw new IllegalStateException(
             "Cannot only set relative timers in processing time domain."
-                + " Use #setForNowPlus(Duration)");
+                + " Use #setRelative()");
       }
     }
 
@@ -867,5 +902,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
               String.format("Timer created for unknown time domain %s", spec.getTimeDomain()));
       }
     }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 3e404ad..9b63bab 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -388,7 +388,7 @@ public class SimpleDoFnRunnerTest {
 
     @ProcessElement
     public void process(ProcessContext context, @TimerId(TIMER_ID) Timer timer) {
-      timer.setForNowPlus(TIMER_OFFSET);
+      timer.offset(TIMER_OFFSET).setRelative();
     }
 
     @OnTimer(TIMER_ID)

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 0368476..eab08f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -385,7 +385,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
    *    public void processElement(
    *        ProcessContext c,
    *       {@literal @TimerId("my-timer-id") Timer myTimer}) {
-   *      myTimer.setForNowPlus(Duration.standardSeconds(...));
+   *      myTimer.offset(Duration.standardSeconds(...)).setRelative();
    *    }
    *
    *   {@literal @OnTimer("my-timer-id")}

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
index 45a2a66..9727969 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
@@ -49,19 +49,30 @@ public interface Timer {
    *
    * <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since processing
    * time timers are ignored after a window has expired. Instead, it is recommended to use
-   * {@link #setForNowPlus(Duration)}.
+   * {@link #setRelative()}.
    */
-  void set(Instant instant);
-
-  /**
-   * Sets or resets the time relative to the current time in the timer's {@link TimeDomain} at which
-   * this it should fire. If the timer was already set, resets it to the new requested time.
-   */
-  void setForNowPlus(Duration durationFromNow);
+  void set(Instant absoluteTime);
 
   /**
    * Unsets this timer. It is permitted to {@code cancel()} whether or not the timer was actually
    * set.
    */
   void cancel();
+
+  /**
+   * Sets the timer relative to the current time, according to any offset and alignment specified.
+   * Using {@link #offset(Duration)} and {@link #align(Duration)}.
+   */
+  void setRelative();
+
+  /**
+   * Set the align offset.
+   */
+  Timer offset(Duration offset);
+
+  /**
+   * Aligns a timestamp to the next boundary of {@code period}.
+   */
+  Timer align(Duration period);
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d4475c9..1c919d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2582,7 +2582,7 @@ public class ParDoTest implements Serializable {
 
           @ProcessElement
           public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
-            timer.setForNowPlus(Duration.standardSeconds(1));
+            timer.offset(Duration.standardSeconds(1)).setRelative();
             context.output(3);
           }
 
@@ -2599,6 +2599,36 @@ public class ParDoTest implements Serializable {
 
   @Test
   @Category({ValidatesRunner.class, UsesTimersInParDo.class})
+  public void testEventTimeTimerAlignBounded() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+        new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
+            context.output(KV.of(3, context.timestamp()));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(KV.of(42, context.timestamp()));
+          }
+        };
+
+    PCollection<KV<Integer, Instant>> output =
+        pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(KV.of(3, BoundedWindow.TIMESTAMP_MIN_VALUE),
+        KV.of(42, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1774)));
+    pipeline.run();
+  }
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTimersInParDo.class})
   public void testTimerReceivedInOriginalWindow() throws Exception {
     final String timerId = "foo";
 
@@ -2610,7 +2640,7 @@ public class ParDoTest implements Serializable {
 
           @ProcessElement
           public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
-            timer.setForNowPlus(Duration.standardSeconds(1));
+            timer.offset(Duration.standardSeconds(1)).setRelative();
           }
 
           @OnTimer(timerId)
@@ -2814,7 +2844,7 @@ public class ParDoTest implements Serializable {
 
           @ProcessElement
           public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
-            timer.setForNowPlus(Duration.standardSeconds(1));
+            timer.offset(Duration.standardSeconds(1)).setRelative();
             context.output(3);
           }
 
@@ -2848,7 +2878,7 @@ public class ParDoTest implements Serializable {
 
           @ProcessElement
           public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
-            timer.setForNowPlus(Duration.standardSeconds(1));
+            timer.offset(Duration.standardSeconds(1)).setRelative();
             context.output(3);
           }
 
@@ -2871,6 +2901,81 @@ public class ParDoTest implements Serializable {
   }
 
   @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+  public void testEventTimeTimerAlignUnbounded() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+        new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
+            context.output(KV.of(3, context.timestamp()));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(KV.of(42, context.timestamp()));
+          }
+        };
+
+    TestStream<KV<String, Integer>> stream = TestStream.create(KvCoder
+        .of(StringUtf8Coder.of(), VarIntCoder.of()))
+        .advanceWatermarkTo(new Instant(5))
+        .addElements(KV.of("hello", 37))
+        .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1).plus(1)))
+        .advanceWatermarkToInfinity();
+
+    PCollection<KV<Integer, Instant>> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(KV.of(3, new Instant(5)),
+        KV.of(42, new Instant(Duration.standardSeconds(1).minus(1).getMillis())));
+    pipeline.run();
+  }
+
+  @Test
+  @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+  public void testEventTimeTimerAlignAfterGcTimeUnbounded() throws Exception {
+    final String timerId = "foo";
+
+    DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+        new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+          @TimerId(timerId)
+          private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+          @ProcessElement
+          public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+            // This aligned time will exceed the END_OF_GLOBAL_WINDOW
+            timer.align(Duration.standardDays(1)).setRelative();
+            context.output(KV.of(3, context.timestamp()));
+          }
+
+          @OnTimer(timerId)
+          public void onTimer(OnTimerContext context) {
+            context.output(KV.of(42, context.timestamp()));
+          }
+        };
+
+    TestStream<KV<String, Integer>> stream = TestStream.create(KvCoder
+        .of(StringUtf8Coder.of(), VarIntCoder.of()))
+        // See GlobalWindow,
+        // END_OF_GLOBAL_WINDOW is TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))
+        .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)))
+        .addElements(KV.of("hello", 37))
+        .advanceWatermarkToInfinity();
+
+    PCollection<KV<Integer, Instant>> output = pipeline.apply(stream).apply(ParDo.of(fn));
+    PAssert.that(output).containsInAnyOrder(
+        KV.of(3, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))),
+        KV.of(42, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))));
+    pipeline.run();
+  }
+
+  @Test
   public void testWithOutputTagsDisplayData() {
     DoFn<String, String> fn = new DoFn<String, String>() {
       @ProcessElement


[3/3] beam git commit: This closes #2273: Add align and offset to Timer

Posted by ke...@apache.org.
This closes #2273: Add align and offset to Timer

  Fix initial watermark of DoFnOperator in Flink runner
  [BEAM-1727] Add align and offset to Timer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9fffa7ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9fffa7ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9fffa7ef

Branch: refs/heads/master
Commit: 9fffa7efac634eebecf802626c6d7c88e3d60be5
Parents: e2aa889 7ece164
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 4 13:27:27 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 13:27:27 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformMatchersTest.java    |   2 +-
 .../beam/runners/core/SimpleDoFnRunner.java     |  44 +++++++-
 .../beam/runners/core/SimpleDoFnRunnerTest.java |   2 +-
 .../wrappers/streaming/DoFnOperator.java        |  20 +++-
 .../org/apache/beam/sdk/transforms/DoFn.java    |   2 +-
 .../java/org/apache/beam/sdk/util/Timer.java    |  27 +++--
 .../apache/beam/sdk/transforms/ParDoTest.java   | 113 ++++++++++++++++++-
 7 files changed, 185 insertions(+), 25 deletions(-)
----------------------------------------------------------------------