You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/09/15 09:57:00 UTC

[jira] [Commented] (FLINK-7563) Fix watermark semantics in CEP operators

    [ https://issues.apache.org/jira/browse/FLINK-7563?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16167644#comment-16167644 ] 

ASF GitHub Bot commented on FLINK-7563:
---------------------------------------

Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4632
  
    Hi @yestinchen,
    Thanks for working on this issue. Unfortunately your changes do not fix the semantics. Have a look at this failing test:
    
    	@Test
    	public void testCEPWatermarkSemantic() throws Exception {
    		Event startEvent = new Event(42, "start", 1.0);
    		SubEvent middleEvent = new SubEvent(42, "foo2", 2.0, 10.0);
    		Event endEvent = new Event(42, "end", 1.0);
    
    		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false);
    
    		try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator)) {
    			harness.open();
    
    			harness.processWatermark(0L);
    			verifyWatermark(harness.getOutput().poll(), 0L);
    
    			harness.processElement(new StreamRecord<>(startEvent, 1L));
    			harness.processElement(new StreamRecord<>(middleEvent, 2L));
    			harness.processElement(new StreamRecord<>(endEvent, 3L));
    
    			assertTrue(operator.hasNonEmptyPQ(42));
    			assertTrue(!operator.hasNonEmptyNFA(42));
    
    			harness.processWatermark(3L);
    			verifyWatermark(harness.getOutput().poll(), 3L);
    
    			assertTrue(operator.hasNonEmptyPQ(42));
    			assertTrue(operator.hasNonEmptyNFA(42));
    
    			harness.processWatermark(4L);
    			assertTrue(!operator.hasNonEmptyPQ(42));
    
    			verifyPattern(harness.getOutput().poll(), startEvent, middleEvent, endEvent);
    			verifyWatermark(harness.getOutput().poll(), 4L);
    		}
    	}
    
    To fix it you also have to fix the condition in `AbstractKeyedCEPPatternOperator::onEventTime:232`. Instead of:
    
    	while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
    
    it should be 
    
    	while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() < timerService.currentWatermark()) {


> Fix watermark semantics in CEP operators
> ----------------------------------------
>
>                 Key: FLINK-7563
>                 URL: https://issues.apache.org/jira/browse/FLINK-7563
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>            Reporter: Aljoscha Krettek
>            Assignee: Yueting Chen
>            Priority: Blocker
>             Fix For: 1.4.0, 1.3.3
>
>
> See https://lists.apache.org/thread.html/3541e72ba3842192e58a487e54c2817f6b2b9d12af5fee97af83e5df@%3Cdev.flink.apache.org%3E for reference.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)