You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/09/15 09:57:00 UTC
[jira] [Commented] (FLINK-7563) Fix watermark semantics in CEP
operators
[ https://issues.apache.org/jira/browse/FLINK-7563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167644#comment-16167644 ]
ASF GitHub Bot commented on FLINK-7563:
---------------------------------------
Github user dawidwys commented on the issue:
https://github.com/apache/flink/pull/4632
Hi @yestinchen,
Thanks for working on this issue. Unfortunately your changes do not fix the semantics. Have a look at this failing test:
@Test
public void testCEPWatermarkSemantic() throws Exception {
Event startEvent = new Event(42, "start", 1.0);
SubEvent middleEvent = new SubEvent(42, "foo2", 2.0, 10.0);
Event endEvent = new Event(42, "end", 1.0);
SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false);
try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator)) {
harness.open();
harness.processWatermark(0L);
verifyWatermark(harness.getOutput().poll(), 0L);
harness.processElement(new StreamRecord<>(startEvent, 1L));
harness.processElement(new StreamRecord<>(middleEvent, 2L));
harness.processElement(new StreamRecord<>(endEvent, 3L));
assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(!operator.hasNonEmptyNFA(42));
harness.processWatermark(3L);
verifyWatermark(harness.getOutput().poll(), 3L);
assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(operator.hasNonEmptyNFA(42));
harness.processWatermark(4L);
assertTrue(!operator.hasNonEmptyPQ(42));
verifyPattern(harness.getOutput().poll(), startEvent, middleEvent, endEvent);
verifyWatermark(harness.getOutput().poll(), 4L);
}
}
To fix it you also have to fix the condition in `AbstractKeyedCEPPatternOperator::onEventTime:232`. Instead of:
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
it should be
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() < timerService.currentWatermark()) {
> Fix watermark semantics in CEP operators
> ----------------------------------------
>
> Key: FLINK-7563
> URL: https://issues.apache.org/jira/browse/FLINK-7563
> Project: Flink
> Issue Type: Bug
> Components: CEP
> Reporter: Aljoscha Krettek
> Assignee: Yueting Chen
> Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> See https://lists.apache.org/thread.html/3541e72ba3842192e58a487e54c2817f6b2b9d12af5fee97af83e5df@%3Cdev.flink.apache.org%3E for reference.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)