You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/09/17 16:15:24 UTC

flink git commit: [FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

Repository: flink
Updated Branches:
  refs/heads/master 312e08534 -> 258b385f4


[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.

This closes #4632


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/258b385f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/258b385f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/258b385f

Branch: refs/heads/master
Commit: 258b385f4f23a1c20c85d999d1ae2bf5b0be45db
Parents: 312e085
Author: Yestin <87...@qq.com>
Authored: Thu Aug 31 17:09:30 2017 -0400
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Sun Sep 17 18:11:43 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/operator/AbstractKeyedCEPPatternOperator.java  | 4 ++--
 .../java/org/apache/flink/cep/operator/CEPOperatorTest.java  | 8 +++++---
 2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index ae2d7e4..01aad76 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -170,10 +170,10 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 			IN value = element.getValue();
 
 			// In event-time processing we assume correctness of the watermark.
-			// Events with timestamp smaller than the last seen watermark are considered late.
+			// Events with timestamp smaller than or equal with the last seen watermark are considered late.
 			// Late events are put in a dedicated side output, if the user has specified one.
 
-			if (timestamp >= lastWatermark) {
+			if (timestamp > lastWatermark) {
 
 				// we have an event with a valid timestamp, so
 				// we buffer it until we receive the proper watermark.

http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 9eb60de..ed8b923 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -604,6 +604,7 @@ public class CEPOperatorTest extends TestLogger {
 			verifyWatermark(harness.getOutput().poll(), 11L);
 			verifyWatermark(harness.getOutput().poll(), 12L);
 
+			// this is a late event, because timestamp(12) = last watermark(12)
 			harness.processElement(new StreamRecord<Event>(middleEvent3, 12L));
 			harness.processElement(new StreamRecord<>(endEvent2, 13L));
 			harness.processWatermark(20L);
@@ -613,8 +614,9 @@ public class CEPOperatorTest extends TestLogger {
 			assertTrue(!operator2.hasNonEmptyPQ(42));
 			assertEquals(0L, harness.numEventTimeTimers());
 
+			assertEquals(3, harness.getOutput().size());
 			verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
-			verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
+
 			verifyWatermark(harness.getOutput().poll(), 20L);
 			verifyWatermark(harness.getOutput().poll(), 21L);
 		} finally {
@@ -871,9 +873,9 @@ public class CEPOperatorTest extends TestLogger {
 			harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
 			harness.processElement(new StreamRecord<>(startEvent2, 4));
 			harness.processWatermark(5L);
-			harness.processElement(new StreamRecord<>(nextOne, 6));
+			harness.processElement(new StreamRecord<>(nextOne, 7));
 			harness.processElement(new StreamRecord<>(endEvent, 8));
-			harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
+			harness.processElement(new StreamRecord<Event>(middleEvent4, 6));
 			harness.processWatermark(100L);
 
 			List<List<Event>> resultingPatterns = new ArrayList<>();