You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/05/04 20:27:50 UTC
[1/3] beam git commit: Fix initial watermark of DoFnOperator in Flink
runner
Repository: beam
Updated Branches:
refs/heads/master e2aa8892e -> 9fffa7efa
Fix initial watermark of DoFnOperator in Flink runner
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/7ece1647
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/7ece1647
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/7ece1647
Branch: refs/heads/master
Commit: 7ece1647d6aeab4544d994a3a00360919920a978
Parents: 4f93492
Author: JingsongLi <lz...@aliyun.com>
Authored: Tue May 2 17:36:46 2017 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 13:27:05 2017 -0700
----------------------------------------------------------------------
.../wrappers/streaming/DoFnOperator.java | 20 ++++++++++++++------
1 file changed, 14 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/7ece1647/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
index 16bf5d2..518d6be 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java
@@ -198,8 +198,8 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
public void open() throws Exception {
super.open();
- currentInputWatermark = Long.MIN_VALUE;
- currentOutputWatermark = Long.MIN_VALUE;
+ setCurrentInputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
+ setCurrentOutputWatermark(BoundedWindow.TIMESTAMP_MIN_VALUE.getMillis());
sideInputReader = NullSideInputReader.of(sideInputs);
@@ -429,18 +429,18 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
@Override
public void processWatermark1(Watermark mark) throws Exception {
if (keyCoder == null) {
- this.currentInputWatermark = mark.getTimestamp();
+ setCurrentInputWatermark(mark.getTimestamp());
long potentialOutputWatermark =
Math.min(getPushbackWatermarkHold(), currentInputWatermark);
if (potentialOutputWatermark > currentOutputWatermark) {
- currentOutputWatermark = potentialOutputWatermark;
+ setCurrentOutputWatermark(potentialOutputWatermark);
output.emitWatermark(new Watermark(currentOutputWatermark));
}
} else {
// fireTimers, so we need startBundle.
pushbackDoFnRunner.startBundle();
- this.currentInputWatermark = mark.getTimestamp();
+ setCurrentInputWatermark(mark.getTimestamp());
// hold back by the pushed back values waiting for side inputs
long actualInputWatermark = Math.min(getPushbackWatermarkHold(), mark.getTimestamp());
@@ -454,7 +454,7 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
long potentialOutputWatermark = Math.min(currentInputWatermark, combinedWatermarkHold);
if (potentialOutputWatermark > currentOutputWatermark) {
- currentOutputWatermark = potentialOutputWatermark;
+ setCurrentOutputWatermark(potentialOutputWatermark);
output.emitWatermark(new Watermark(currentOutputWatermark));
}
pushbackDoFnRunner.finishBundle();
@@ -608,6 +608,14 @@ public class DoFnOperator<InputT, FnOutputT, OutputT>
timerData.getTimestamp(), timerData.getDomain());
}
+ private void setCurrentInputWatermark(long currentInputWatermark) {
+ this.currentInputWatermark = currentInputWatermark;
+ }
+
+ private void setCurrentOutputWatermark(long currentOutputWatermark) {
+ this.currentOutputWatermark = currentOutputWatermark;
+ }
+
/**
* Factory for creating an {@link DoFnRunners.OutputManager} from
* a Flink {@link Output}.
[2/3] beam git commit: [BEAM-1727] Add align and offset to Timer
Posted by ke...@apache.org.
[BEAM-1727] Add align and offset to Timer
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f934923
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f934923
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f934923
Branch: refs/heads/master
Commit: 4f934923d28798dfe7cd18c86ff4bcf8eebc27e5
Parents: e2aa889
Author: JingsongLi <lz...@aliyun.com>
Authored: Mon Mar 20 12:12:31 2017 +0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 13:27:05 2017 -0700
----------------------------------------------------------------------
.../construction/PTransformMatchersTest.java | 2 +-
.../beam/runners/core/SimpleDoFnRunner.java | 44 +++++++-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +-
.../org/apache/beam/sdk/transforms/DoFn.java | 2 +-
.../java/org/apache/beam/sdk/util/Timer.java | 27 +++--
.../apache/beam/sdk/transforms/ParDoTest.java | 113 ++++++++++++++++++-
6 files changed, 171 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index e7d4c64..bb1b1cd 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -193,7 +193,7 @@ public class PTransformMatchersTest implements Serializable {
@ProcessElement
public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
- timer.setForNowPlus(Duration.standardSeconds(1));
+ timer.offset(Duration.standardSeconds(1)).setRelative();
context.output(3);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 8a3e25f..7f29a6f 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -788,6 +788,8 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
private final StateNamespace namespace;
private final String timerId;
private final TimerSpec spec;
+ private Duration period = Duration.ZERO;
+ private Duration offset = Duration.ZERO;
public TimerInternalsTimer(
BoundedWindow window,
@@ -812,12 +814,45 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
}
@Override
- public void setForNowPlus(Duration durationFromNow) {
- Instant target = getCurrentTime().plus(durationFromNow);
- verifyTargetTime(target);
+ public void setRelative() {
+ Instant target;
+ Instant now = getCurrentTime();
+ if (period.equals(Duration.ZERO)) {
+ target = now.plus(offset);
+ } else {
+ long millisSinceStart = now.plus(offset).getMillis() % period.getMillis();
+ target = millisSinceStart == 0 ? now : now.plus(period).minus(millisSinceStart);
+ }
+ target = minTargetAndGcTime(target);
setUnderlyingTimer(target);
}
+ @Override
+ public Timer offset(Duration offset) {
+ this.offset = offset;
+ return this;
+ }
+
+ @Override
+ public Timer align(Duration period) {
+ this.period = period;
+ return this;
+ }
+
+ /**
+ * For event time timers the target time should be prior to window GC time. So it return
+ * min(time to set, GC Time of window).
+ */
+ private Instant minTargetAndGcTime(Instant target) {
+ if (TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
+ Instant windowExpiry = window.maxTimestamp().plus(allowedLateness);
+ if (target.isAfter(windowExpiry)) {
+ return windowExpiry;
+ }
+ }
+ return target;
+ }
+
/**
* Ensures that the target time is reasonable. For event time timers this means that the
* time should be prior to window GC time.
@@ -836,7 +871,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
if (!TimeDomain.EVENT_TIME.equals(spec.getTimeDomain())) {
throw new IllegalStateException(
"Cannot only set relative timers in processing time domain."
- + " Use #setForNowPlus(Duration)");
+ + " Use #setRelative()");
}
}
@@ -867,5 +902,6 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out
String.format("Timer created for unknown time domain %s", spec.getTimeDomain()));
}
}
+
}
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
index 3e404ad..9b63bab 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleDoFnRunnerTest.java
@@ -388,7 +388,7 @@ public class SimpleDoFnRunnerTest {
@ProcessElement
public void process(ProcessContext context, @TimerId(TIMER_ID) Timer timer) {
- timer.setForNowPlus(TIMER_OFFSET);
+ timer.offset(TIMER_OFFSET).setRelative();
}
@OnTimer(TIMER_ID)
http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
index 0368476..eab08f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java
@@ -385,7 +385,7 @@ public abstract class DoFn<InputT, OutputT> implements Serializable, HasDisplayD
* public void processElement(
* ProcessContext c,
* {@literal @TimerId("my-timer-id") Timer myTimer}) {
- * myTimer.setForNowPlus(Duration.standardSeconds(...));
+ * myTimer.offset(Duration.standardSeconds(...)).setRelative();
* }
*
* {@literal @OnTimer("my-timer-id")}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
index 45a2a66..9727969 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/Timer.java
@@ -49,19 +49,30 @@ public interface Timer {
*
* <p>For {@link TimeDomain#PROCESSING_TIME}, the behavior is be unpredictable, since processing
* time timers are ignored after a window has expired. Instead, it is recommended to use
- * {@link #setForNowPlus(Duration)}.
+ * {@link #setRelative()}.
*/
- void set(Instant instant);
-
- /**
- * Sets or resets the time relative to the current time in the timer's {@link TimeDomain} at which
- * this it should fire. If the timer was already set, resets it to the new requested time.
- */
- void setForNowPlus(Duration durationFromNow);
+ void set(Instant absoluteTime);
/**
* Unsets this timer. It is permitted to {@code cancel()} whether or not the timer was actually
* set.
*/
void cancel();
+
+ /**
+ * Sets the timer relative to the current time, according to any offset and alignment specified.
+ * Using {@link #offset(Duration)} and {@link #align(Duration)}.
+ */
+ void setRelative();
+
+ /**
+ * Set the align offset.
+ */
+ Timer offset(Duration offset);
+
+ /**
+ * Aligns a timestamp to the next boundary of {@code period}.
+ */
+ Timer align(Duration period);
+
}
http://git-wip-us.apache.org/repos/asf/beam/blob/4f934923/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index d4475c9..1c919d4 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2582,7 +2582,7 @@ public class ParDoTest implements Serializable {
@ProcessElement
public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
- timer.setForNowPlus(Duration.standardSeconds(1));
+ timer.offset(Duration.standardSeconds(1)).setRelative();
context.output(3);
}
@@ -2599,6 +2599,36 @@ public class ParDoTest implements Serializable {
@Test
@Category({ValidatesRunner.class, UsesTimersInParDo.class})
+ public void testEventTimeTimerAlignBounded() throws Exception {
+ final String timerId = "foo";
+
+ DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+ new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+ timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
+ context.output(KV.of(3, context.timestamp()));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context) {
+ context.output(KV.of(42, context.timestamp()));
+ }
+ };
+
+ PCollection<KV<Integer, Instant>> output =
+ pipeline.apply(Create.of(KV.of("hello", 37))).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(KV.of(3, BoundedWindow.TIMESTAMP_MIN_VALUE),
+ KV.of(42, BoundedWindow.TIMESTAMP_MIN_VALUE.plus(1774)));
+ pipeline.run();
+ }
+
+ @Test
+ @Category({ValidatesRunner.class, UsesTimersInParDo.class})
public void testTimerReceivedInOriginalWindow() throws Exception {
final String timerId = "foo";
@@ -2610,7 +2640,7 @@ public class ParDoTest implements Serializable {
@ProcessElement
public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
- timer.setForNowPlus(Duration.standardSeconds(1));
+ timer.offset(Duration.standardSeconds(1)).setRelative();
}
@OnTimer(timerId)
@@ -2814,7 +2844,7 @@ public class ParDoTest implements Serializable {
@ProcessElement
public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
- timer.setForNowPlus(Duration.standardSeconds(1));
+ timer.offset(Duration.standardSeconds(1)).setRelative();
context.output(3);
}
@@ -2848,7 +2878,7 @@ public class ParDoTest implements Serializable {
@ProcessElement
public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
- timer.setForNowPlus(Duration.standardSeconds(1));
+ timer.offset(Duration.standardSeconds(1)).setRelative();
context.output(3);
}
@@ -2871,6 +2901,81 @@ public class ParDoTest implements Serializable {
}
@Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+ public void testEventTimeTimerAlignUnbounded() throws Exception {
+ final String timerId = "foo";
+
+ DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+ new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+ timer.align(Duration.standardSeconds(1)).offset(Duration.millis(1)).setRelative();
+ context.output(KV.of(3, context.timestamp()));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context) {
+ context.output(KV.of(42, context.timestamp()));
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream = TestStream.create(KvCoder
+ .of(StringUtf8Coder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(5))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkTo(new Instant(0).plus(Duration.standardSeconds(1).plus(1)))
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<Integer, Instant>> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(KV.of(3, new Instant(5)),
+ KV.of(42, new Instant(Duration.standardSeconds(1).minus(1).getMillis())));
+ pipeline.run();
+ }
+
+ @Test
+ @Category({NeedsRunner.class, UsesTimersInParDo.class, UsesTestStream.class})
+ public void testEventTimeTimerAlignAfterGcTimeUnbounded() throws Exception {
+ final String timerId = "foo";
+
+ DoFn<KV<String, Integer>, KV<Integer, Instant>> fn =
+ new DoFn<KV<String, Integer>, KV<Integer, Instant>>() {
+
+ @TimerId(timerId)
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void processElement(ProcessContext context, @TimerId(timerId) Timer timer) {
+ // This aligned time will exceed the END_OF_GLOBAL_WINDOW
+ timer.align(Duration.standardDays(1)).setRelative();
+ context.output(KV.of(3, context.timestamp()));
+ }
+
+ @OnTimer(timerId)
+ public void onTimer(OnTimerContext context) {
+ context.output(KV.of(42, context.timestamp()));
+ }
+ };
+
+ TestStream<KV<String, Integer>> stream = TestStream.create(KvCoder
+ .of(StringUtf8Coder.of(), VarIntCoder.of()))
+ // See GlobalWindow,
+ // END_OF_GLOBAL_WINDOW is TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))
+ .advanceWatermarkTo(BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1)))
+ .addElements(KV.of("hello", 37))
+ .advanceWatermarkToInfinity();
+
+ PCollection<KV<Integer, Instant>> output = pipeline.apply(stream).apply(ParDo.of(fn));
+ PAssert.that(output).containsInAnyOrder(
+ KV.of(3, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))),
+ KV.of(42, BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1))));
+ pipeline.run();
+ }
+
+ @Test
public void testWithOutputTagsDisplayData() {
DoFn<String, String> fn = new DoFn<String, String>() {
@ProcessElement
[3/3] beam git commit: This closes #2273: Add align and offset to
Timer
Posted by ke...@apache.org.
This closes #2273: Add align and offset to Timer
Fix initial watermark of DoFnOperator in Flink runner
[BEAM-1727] Add align and offset to Timer
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9fffa7ef
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9fffa7ef
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9fffa7ef
Branch: refs/heads/master
Commit: 9fffa7efac634eebecf802626c6d7c88e3d60be5
Parents: e2aa889 7ece164
Author: Kenneth Knowles <kl...@google.com>
Authored: Thu May 4 13:27:27 2017 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Thu May 4 13:27:27 2017 -0700
----------------------------------------------------------------------
.../construction/PTransformMatchersTest.java | 2 +-
.../beam/runners/core/SimpleDoFnRunner.java | 44 +++++++-
.../beam/runners/core/SimpleDoFnRunnerTest.java | 2 +-
.../wrappers/streaming/DoFnOperator.java | 20 +++-
.../org/apache/beam/sdk/transforms/DoFn.java | 2 +-
.../java/org/apache/beam/sdk/util/Timer.java | 27 +++--
.../apache/beam/sdk/transforms/ParDoTest.java | 113 ++++++++++++++++++-
7 files changed, 185 insertions(+), 25 deletions(-)
----------------------------------------------------------------------