You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/10/24 15:00:12 UTC

flink git commit: [FLINK-7388] Don't set processing time as timestamp in ProcessFunction.onTimer()

Repository: flink
Updated Branches:
  refs/heads/master 8c89f3c6b -> 904c95104


[FLINK-7388] Don't set processing time as timestamp in ProcessFunction.onTimer()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/904c9510
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/904c9510
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/904c9510

Branch: refs/heads/master
Commit: 904c95104d70f483bc79f98a35b64ec46c7f6a80
Parents: 8c89f3c
Author: Bowen Li <bo...@gmail.com>
Authored: Sun Oct 8 15:57:51 2017 -0700
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Tue Oct 24 16:59:42 2017 +0200

----------------------------------------------------------------------
 docs/dev/stream/operators/process_function.md             |  7 +++++++
 .../streaming/api/operators/KeyedProcessOperator.java     |  2 +-
 .../api/operators/co/KeyedCoProcessOperator.java          |  2 +-
 .../streaming/api/operators/KeyedProcessOperatorTest.java |  8 ++++----
 .../api/operators/co/KeyedCoProcessOperatorTest.java      | 10 +++++-----
 5 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/docs/dev/stream/operators/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md
index 9f32359..a52c5bf 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -236,3 +236,10 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
 </div>
 
 {% top %}
+
+
+**NOTE:** Before Flink 1.4.0, when called from a processing-time timer, the `ProcessFunction.onTimer()` method sets
+the current processing time as event-time timestamp. This behavior is very subtle and might not be noticed by users. Well, it's
+harmful because processing-time timestamps are indeterministic and not aligned with watermarks. Besides, user-implemented logic
+depends on this wrong timestamp highly likely is unintendedly faulty. So we've decided to fix it. Upon upgrading to 1.4.0, Flink jobs
+that are using this incorrect event-time timestamp will fail, and users should adapt their jobs to the correct logic.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
index 7d7ee86..0f4b4f5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -79,7 +79,7 @@ public class KeyedProcessOperator<K, IN, OUT>
 
 	@Override
 	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		collector.eraseTimestamp();
 		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
 		onTimerContext.timer = timer;
 		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);

http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
index d53e6e8..e9402cf 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
@@ -99,7 +99,7 @@ public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
 
 	@Override
 	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		collector.eraseTimestamp();
 		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
 		onTimerContext.timer = timer;
 		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);

http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
index 8e24c46..3ad8d34 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -157,7 +157,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
 		expectedOutput.add(new StreamRecord<>(17));
-		expectedOutput.add(new StreamRecord<>(1777, 5L));
+		expectedOutput.add(new StreamRecord<>(1777));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
@@ -232,8 +232,8 @@ public class KeyedProcessOperatorTest extends TestLogger {
 
 		expectedOutput.add(new StreamRecord<>("INPUT:17"));
 		expectedOutput.add(new StreamRecord<>("INPUT:42"));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new StreamRecord<>("STATE:17"));
+		expectedOutput.add(new StreamRecord<>("STATE:42"));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
@@ -272,7 +272,7 @@ public class KeyedProcessOperatorTest extends TestLogger {
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
-		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+		expectedOutput.add(new StreamRecord<>("PROC:1777"));
 		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
 		expectedOutput.add(new Watermark(6));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/904c9510/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
index 3f590ff..13c6a19 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
@@ -177,8 +177,8 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
 
 		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
 		expectedOutput.add(new StreamRecord<>("INPUT2:18"));
-		expectedOutput.add(new StreamRecord<>("1777", 5L));
-		expectedOutput.add(new StreamRecord<>("1777", 6L));
+		expectedOutput.add(new StreamRecord<>("1777"));
+		expectedOutput.add(new StreamRecord<>("1777"));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
@@ -266,8 +266,8 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
 
 		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
 		expectedOutput.add(new StreamRecord<>("INPUT2:42"));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new StreamRecord<>("STATE:17"));
+		expectedOutput.add(new StreamRecord<>("STATE:42"));
 
 		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
 
@@ -316,7 +316,7 @@ public class KeyedCoProcessOperatorTest extends TestLogger {
 
 		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
 
-		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+		expectedOutput.add(new StreamRecord<>("PROC:1777"));
 		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
 		expectedOutput.add(new Watermark(6));