You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/09/05 12:22:43 UTC

[4/5] flink git commit: [FLINK-7564] [table] Fix watermark semantics in rowtime unbounded OVER window.

[FLINK-7564] [table] Fix watermark semantics in rowtime unbounded OVER window.

This closes #4633.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b7b0d400
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b7b0d400
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b7b0d400

Branch: refs/heads/master
Commit: b7b0d400a8e630eb81d7ec51f112a4ded5c1b03f
Parents: 09344aa
Author: Xingcan Cui <xi...@gmail.com>
Authored: Fri Sep 1 09:16:21 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Tue Sep 5 13:53:59 2017 +0200

----------------------------------------------------------------------
 .../flink/table/runtime/aggregate/RowTimeUnboundedOver.scala   | 2 +-
 .../flink/table/runtime/harness/OverWindowHarnessTest.scala    | 6 ++++++
 2 files changed, 7 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b7b0d400/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
index c8236a3..27d307b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/RowTimeUnboundedOver.scala
@@ -114,7 +114,7 @@ abstract class RowTimeUnboundedOver(
     val curWatermark = ctx.timerService().currentWatermark()
 
     // discard late record
-    if (timestamp >= curWatermark) {
+    if (timestamp > curWatermark) {
       // ensure every key just registers one timer
       ctx.timerService.registerEventTimeTimer(curWatermark + 1)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b7b0d400/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
index ba36e18..def1972 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/OverWindowHarnessTest.scala
@@ -712,6 +712,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
+      CRow(Row.of(20000L: JLong, "ccc", 1L: JLong), change = true))) // test for late data
+
+    testHarness.processElement(new StreamRecord(
       CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(
@@ -845,6 +848,9 @@ class OverWindowHarnessTest extends HarnessTestBase{
 
     testHarness.processWatermark(20000)
     testHarness.processElement(new StreamRecord(
+      CRow(Row.of(20000L: JLong, "ccc", 2L: JLong), change = true))) // test for late data
+
+    testHarness.processElement(new StreamRecord(
       CRow(Row.of(20001L: JLong, "ccc", 1L: JLong), change = true))) // clean-up 5000
     testHarness.setProcessingTime(2500)
     testHarness.processElement(new StreamRecord(