You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dawidwys <gi...@git.apache.org> on 2017/09/15 09:56:10 UTC

[GitHub] flink issue #4632: [FLINK-7563] [cep] Fix watermark semantics in cep and rel...

Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4632
  
    Hi @yestinchen,
    Thanks for working on this issue. Unfortunately your changes do not fix the semantics. Have a look at this failing test:
    
    	@Test
    	public void testCEPWatermarkSemantic() throws Exception {
    		Event startEvent = new Event(42, "start", 1.0);
    		SubEvent middleEvent = new SubEvent(42, "foo2", 2.0, 10.0);
    		Event endEvent = new Event(42, "end", 1.0);
    
    		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false);
    
    		try (OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator)) {
    			harness.open();
    
    			harness.processWatermark(0L);
    			verifyWatermark(harness.getOutput().poll(), 0L);
    
    			harness.processElement(new StreamRecord<>(startEvent, 1L));
    			harness.processElement(new StreamRecord<>(middleEvent, 2L));
    			harness.processElement(new StreamRecord<>(endEvent, 3L));
    
    			assertTrue(operator.hasNonEmptyPQ(42));
    			assertTrue(!operator.hasNonEmptyNFA(42));
    
    			harness.processWatermark(3L);
    			verifyWatermark(harness.getOutput().poll(), 3L);
    
    			assertTrue(operator.hasNonEmptyPQ(42));
    			assertTrue(operator.hasNonEmptyNFA(42));
    
    			harness.processWatermark(4L);
    			assertTrue(!operator.hasNonEmptyPQ(42));
    
    			verifyPattern(harness.getOutput().poll(), startEvent, middleEvent, endEvent);
    			verifyWatermark(harness.getOutput().poll(), 4L);
    		}
    	}
    
    To fix it you also have to fix the condition in `AbstractKeyedCEPPatternOperator::onEventTime:232`. Instead of:
    
    	while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
    
    it should be 
    
    	while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() < timerService.currentWatermark()) {


---