You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nicholas Jiang (Jira)" <ji...@apache.org> on 2022/03/28 06:33:00 UTC

[jira] [Closed] (FLINK-22888) Matches results may be wrong when using notNext as the last part of the pattern with Window

     [ https://issues.apache.org/jira/browse/FLINK-22888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Nicholas Jiang closed FLINK-22888.
----------------------------------

> Matches results may be wrong when using notNext as the last part of the pattern with Window
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-22888
>                 URL: https://issues.apache.org/jira/browse/FLINK-22888
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / CEP
>    Affects Versions: 1.9.0
>            Reporter: Yue Ma
>            Assignee: Nicholas Jiang
>            Priority: Minor
>
> the pattern is like 
> Pattern.begin("start").where(records == "a")
>             .notNext("notNext").where(records == "b")
>             .withIn(5milliseconds).
>  
> If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should be output as the correct result of the match next time in advanceTime.
> But in the actual operation of CEP. This “a” will be treated as matching timeout data
> {code:java}
> // code placeholder
> @Test
> public void testNoNextWithWindow() throws Exception {
>    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>    // (Event, timestamp)
>    DataStream<Event> input = env.fromElements(
>       Tuple2.of(new Event(1, "start", 1.0), 5L),
>       // last element for high final watermark
>       Tuple2.of(new Event(5, "final", 5.0), 100L)
>    ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
>       @Override
>       public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
>          return element.f1;
>       }
>       @Override
>       public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
>          return new Watermark(lastElement.f1 - 5);
>       }
>    }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
>       @Override
>       public Event map(Tuple2<Event, Long> value) throws Exception {
>          return value.f0;
>       }
>    });
>    Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
>       @Override
>       public boolean filter(Event value) throws Exception {
>          return value.getName().equals("start");
>       }
>    }).notNext("middle").where(new SimpleCondition<Event>() {
>       @Override
>       public boolean filter(Event value) throws Exception {
>          return value.getName().equals("middle");
>       }
>    }).within(Time.milliseconds(5L));
>    DataStream<String> result = CEP.pattern(input, pattern).select(
>       new PatternSelectFunction<Event, String>() {
>          @Override
>          public String select(Map<String, List<Event>> pattern) {
>             StringBuilder builder = new StringBuilder();
>             builder.append(pattern.get("start").get(0).getId());
>             return builder.toString();
>          }
>       }
>    );
>    List<String> resultList = new ArrayList<>();
>    DataStreamUtils.collect(result).forEachRemaining(resultList::add);
>    resultList.sort(String::compareTo);
>    assertEquals(Arrays.asList("1"), resultList);
> }
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)