You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/09/17 16:15:24 UTC
flink git commit: [FLINK-7563] [cep] Fix watermark semantics in cep
and related tests.
Repository: flink
Updated Branches:
refs/heads/master 312e08534 -> 258b385f4
[FLINK-7563] [cep] Fix watermark semantics in cep and related tests.
This closes #4632
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/258b385f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/258b385f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/258b385f
Branch: refs/heads/master
Commit: 258b385f4f23a1c20c85d999d1ae2bf5b0be45db
Parents: 312e085
Author: Yestin <87...@qq.com>
Authored: Thu Aug 31 17:09:30 2017 -0400
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Sun Sep 17 18:11:43 2017 +0200
----------------------------------------------------------------------
.../flink/cep/operator/AbstractKeyedCEPPatternOperator.java | 4 ++--
.../java/org/apache/flink/cep/operator/CEPOperatorTest.java | 8 +++++---
2 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index ae2d7e4..01aad76 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -170,10 +170,10 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
IN value = element.getValue();
// In event-time processing we assume correctness of the watermark.
- // Events with timestamp smaller than the last seen watermark are considered late.
+ // Events with timestamp smaller than or equal with the last seen watermark are considered late.
// Late events are put in a dedicated side output, if the user has specified one.
- if (timestamp >= lastWatermark) {
+ if (timestamp > lastWatermark) {
// we have an event with a valid timestamp, so
// we buffer it until we receive the proper watermark.
http://git-wip-us.apache.org/repos/asf/flink/blob/258b385f/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 9eb60de..ed8b923 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -604,6 +604,7 @@ public class CEPOperatorTest extends TestLogger {
verifyWatermark(harness.getOutput().poll(), 11L);
verifyWatermark(harness.getOutput().poll(), 12L);
+ // this is a late event, because timestamp(12) = last watermark(12)
harness.processElement(new StreamRecord<Event>(middleEvent3, 12L));
harness.processElement(new StreamRecord<>(endEvent2, 13L));
harness.processWatermark(20L);
@@ -613,8 +614,9 @@ public class CEPOperatorTest extends TestLogger {
assertTrue(!operator2.hasNonEmptyPQ(42));
assertEquals(0L, harness.numEventTimeTimers());
+ assertEquals(3, harness.getOutput().size());
verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent2, endEvent2);
- verifyPattern(harness.getOutput().poll(), startEvent2, middleEvent3, endEvent2);
+
verifyWatermark(harness.getOutput().poll(), 20L);
verifyWatermark(harness.getOutput().poll(), 21L);
} finally {
@@ -871,9 +873,9 @@ public class CEPOperatorTest extends TestLogger {
harness.processElement(new StreamRecord<Event>(middleEvent2, 3));
harness.processElement(new StreamRecord<>(startEvent2, 4));
harness.processWatermark(5L);
- harness.processElement(new StreamRecord<>(nextOne, 6));
+ harness.processElement(new StreamRecord<>(nextOne, 7));
harness.processElement(new StreamRecord<>(endEvent, 8));
- harness.processElement(new StreamRecord<Event>(middleEvent4, 5));
+ harness.processElement(new StreamRecord<Event>(middleEvent4, 6));
harness.processWatermark(100L);
List<List<Event>> resultingPatterns = new ArrayList<>();