You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Nicholas Jiang (Jira)" <ji...@apache.org> on 2022/03/28 06:22:00 UTC
[jira] [Resolved] (FLINK-22888) Matches results may be wrong when using notNext as the last part of the pattern with Window
[ https://issues.apache.org/jira/browse/FLINK-22888?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nicholas Jiang resolved FLINK-22888.
------------------------------------
Resolution: Won't Fix
> Matches results may be wrong when using notNext as the last part of the pattern with Window
> -------------------------------------------------------------------------------------------
>
> Key: FLINK-22888
> URL: https://issues.apache.org/jira/browse/FLINK-22888
> Project: Flink
> Issue Type: Bug
> Components: Library / CEP
> Affects Versions: 1.9.0
> Reporter: Yue Ma
> Assignee: Nicholas Jiang
> Priority: Minor
>
> the pattern is like
> Pattern.begin("start").where(records == "a")
> .notNext("notNext").where(records == "b")
> .withIn(5milliseconds).
>
> If there is only one event *"a"* in 5 milliseconds. I think this *“a”* should be output as the correct result of the match next time in advanceTime.
> But in the actual operation of CEP. This “a” will be treated as matching timeout data
> {code:java}
> // code placeholder
> @Test
> public void testNoNextWithWindow() throws Exception {
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
> // (Event, timestamp)
> DataStream<Event> input = env.fromElements(
> Tuple2.of(new Event(1, "start", 1.0), 5L),
> // last element for high final watermark
> Tuple2.of(new Event(5, "final", 5.0), 100L)
> ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
> @Override
> public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
> return element.f1;
> }
> @Override
> public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) {
> return new Watermark(lastElement.f1 - 5);
> }
> }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
> @Override
> public Event map(Tuple2<Event, Long> value) throws Exception {
> return value.f0;
> }
> });
> Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
> @Override
> public boolean filter(Event value) throws Exception {
> return value.getName().equals("start");
> }
> }).notNext("middle").where(new SimpleCondition<Event>() {
> @Override
> public boolean filter(Event value) throws Exception {
> return value.getName().equals("middle");
> }
> }).within(Time.milliseconds(5L));
> DataStream<String> result = CEP.pattern(input, pattern).select(
> new PatternSelectFunction<Event, String>() {
> @Override
> public String select(Map<String, List<Event>> pattern) {
> StringBuilder builder = new StringBuilder();
> builder.append(pattern.get("start").get(0).getId());
> return builder.toString();
> }
> }
> );
> List<String> resultList = new ArrayList<>();
> DataStreamUtils.collect(result).forEachRemaining(resultList::add);
> resultList.sort(String::compareTo);
> assertEquals(Arrays.asList("1"), resultList);
> }
> {code}
>
--
This message was sent by Atlassian Jira
(v8.20.1#820001)