You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/24 15:00:12 UTC
flink git commit: [FLINK-7388] Don't set processing time as timestamp
in ProcessFunction.onTimer()
Repository: flink
Updated Branches:
refs/heads/master 8c89f3c6b -> 904c95104
[FLINK-7388] Don't set processing time as timestamp in ProcessFunction.onTimer()
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/904c9510
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/904c9510
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/904c9510
Branch: refs/heads/master
Commit: 904c95104d70f483bc79f98a35b64ec46c7f6a80
Parents: 8c89f3c
Author: Bowen Li <bo...@gmail.com>
Authored: Sun Oct 8 15:57:51 2017 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Oct 24 16:59:42 2017 +0200
----------------------------------------------------------------------
docs/dev/stream/operators/process_function.md | 7 +++++++
.../streaming/api/operators/KeyedProcessOperator.java | 2 +-
.../api/operators/co/KeyedCoProcessOperator.java | 2 +-
.../streaming/api/operators/KeyedProcessOperatorTest.java | 8 ++++----
.../api/operators/co/KeyedCoProcessOperatorTest.java | 10 +++++-----
5 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/docs/dev/stream/operators/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md
index 9f32359..a52c5bf 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -236,3 +236,10 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
</div>
{% top %}
+
+
+**NOTE:** Before Flink 1.4.0, when called from a processing-time timer, the `ProcessFunction.onTimer()` method sets
+the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's
+harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic
+depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
+that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index 7d7ee86..0f4b4f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -79,7 +79,7 @@ public class KeyedProcessOperator<K, IN, OUT>
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
+ collector.eraseTimestamp();
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
index d53e6e8..e9402cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
@@ -99,7 +99,7 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
+ collector.eraseTimestamp();
onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
onTimerContext.timer = timer;
userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
index 8e24c46..3ad8d34 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -157,7 +157,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
expectedOutput.add(new StreamRecord<>(17));
- expectedOutput.add(new StreamRecord<>(1777, 5L));
+ expectedOutput.add(new StreamRecord<>(1777));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -232,8 +232,8 @@ public class KeyedProcessOperatorTest extends TestLogger {
expectedOutput.add(new StreamRecord<>("INPUT:17"));
expectedOutput.add(new StreamRecord<>("INPUT:42"));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+ expectedOutput.add(new StreamRecord<>("STATE:17"));
+ expectedOutput.add(new StreamRecord<>("STATE:42"));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -272,7 +272,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
- expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+ expectedOutput.add(new StreamRecord<>("PROC:1777"));
expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
expectedOutput.add(new Watermark(6));
http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
index 3f590ff..13c6a19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
@@ -177,8 +177,8 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
expectedOutput.add(new StreamRecord<>("INPUT1:17"));
expectedOutput.add(new StreamRecord<>("INPUT2:18"));
- expectedOutput.add(new StreamRecord<>("1777", 5L));
- expectedOutput.add(new StreamRecord<>("1777", 6L));
+ expectedOutput.add(new StreamRecord<>("1777"));
+ expectedOutput.add(new StreamRecord<>("1777"));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -266,8 +266,8 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
expectedOutput.add(new StreamRecord<>("INPUT1:17"));
expectedOutput.add(new StreamRecord<>("INPUT2:42"));
- expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
- expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+ expectedOutput.add(new StreamRecord<>("STATE:17"));
+ expectedOutput.add(new StreamRecord<>("STATE:42"));
TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -316,7 +316,7 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
- expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+ expectedOutput.add(new StreamRecord<>("PROC:1777"));
expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
expectedOutput.add(new Watermark(6));