You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dawidwys <gi...@git.apache.org> on 2017/09/15 09:56:10 UTC
[GitHub] flink issue #4632: [FLINK-7563] [cep] Fix watermark semantics in cep and rel...
Github user dawidwys commented on the issue:
https://github.com/apache/flink/pull/4632
Hi @yestinchen,
Thanks for working on this issue. Unfortunately your changes do not fix the semantics. Have a look at this failing test:
@Test
public void testCEPWatermarkSemantic() throws Exception {
Event startEvent = new Event(42, "start", 1.0);
SubEvent middleEvent = new SubEvent(42, "foo2", 2.0, 10.0);
Event endEvent = new Event(42, "end", 1.0);
SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false);
try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator)) {
harness.open();
harness.processWatermark(0L);
verifyWatermark(harness.getOutput().poll(), 0L);
harness.processElement(new StreamRecord<>(startEvent, 1L));
harness.processElement(new StreamRecord<>(middleEvent, 2L));
harness.processElement(new StreamRecord<>(endEvent, 3L));
assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(!operator.hasNonEmptyNFA(42));
harness.processWatermark(3L);
verifyWatermark(harness.getOutput().poll(), 3L);
assertTrue(operator.hasNonEmptyPQ(42));
assertTrue(operator.hasNonEmptyNFA(42));
harness.processWatermark(4L);
assertTrue(!operator.hasNonEmptyPQ(42));
verifyPattern(harness.getOutput().poll(), startEvent, middleEvent, endEvent);
verifyWatermark(harness.getOutput().poll(), 4L);
}
}
To fix it you also have to fix the condition in `AbstractKeyedCEPPatternOperator::onEventTime:232`. Instead of:
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
it should be
while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() < timerService.currentWatermark()) {
---