You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by yestinchen <gi...@git.apache.org> on 2017/07/14 08:48:10 UTC

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

GitHub user yestinchen opened a pull request:

    https://github.com/apache/flink/pull/4331

    [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yestinchen/flink FLINK-7169

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4331.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4331
    
----
commit 28f2e0ab2b6fd38864017fc64d2c76a65c8f7574
Author: Yestin <87...@qq.com>
Date:   2017-07-14T08:41:51Z

    [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CEP

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132175300
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep;
    +
    +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
    +import org.apache.flink.cep.pattern.MalformedPatternException;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * After match skip tests.
    + */
    +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase {
    +
    +	private String resultPath;
    +	private String expected;
    +
    +	private String lateEventPath;
    +	private String expectedLateEvents;
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	@Before
    +	public void before() throws Exception {
    +		resultPath = tempFolder.newFile().toURI().toString();
    +		expected = "";
    +
    +		lateEventPath = tempFolder.newFile().toURI().toString();
    +		expectedLateEvents = "";
    +	}
    +
    +	@After
    +	public void after() throws Exception {
    +		compareResultsByLinesInMemory(expected, resultPath);
    +		compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
    +	}
    +
    +	private PatternSelectFunction<Event, String> newIdSelectFunction(String ... names) {
    +		return new PatternSelectFunction<Event, String>() {
    +
    +			@Override
    +			public String select(Map<String, List<Event>> pattern) {
    +				StringBuilder builder = new StringBuilder();
    +				for (String name: names) {
    +					for (Event e : pattern.get(name)) {
    +						builder.append(e.getId()).append(",");
    +					}
    +				}
    +				return builder.toString();
    +			}
    +		};
    +	}
    +
    +	@Test
    +	public void testSkipToNext() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n2,3,4,\n3,4,5,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipPastLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToFirst() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(
    +				AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "end"))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n3,4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n4,5,6,7,";
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToLastWithEmptyException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "middle")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("d");
    +				}
    +			}
    +		).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129296661
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_ROW,
    + * SKIP_TO_NEXT_ROW,
    --- End diff --
    
    There is no notion of ROW in CEP library.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132173777
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,59 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    +		private void checkPatternSkipStrategy() {
    +			AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy();
    +			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
    +				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
    +				Pattern<T, ?> pattern = currentPattern;
    +				while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
    +					pattern = pattern.getPrevious();
    +				}
    +				// pattern name match check.
    +				if (pattern == null) {
    +					throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " +
    +						"can not be found in the given Pattern");
    +				} else {
    +					// can not be used with optional states.
    +					if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
    --- End diff --
    
    We could also allow this. See comment below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132895583
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    --- End diff --
    
    You could also add a brief description in the class javadoc for each of the strategies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132175179
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep;
    +
    +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
    +import org.apache.flink.cep.pattern.MalformedPatternException;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * After match skip tests.
    + */
    +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase {
    +
    +	private String resultPath;
    +	private String expected;
    +
    +	private String lateEventPath;
    +	private String expectedLateEvents;
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	@Before
    +	public void before() throws Exception {
    +		resultPath = tempFolder.newFile().toURI().toString();
    +		expected = "";
    +
    +		lateEventPath = tempFolder.newFile().toURI().toString();
    +		expectedLateEvents = "";
    +	}
    +
    +	@After
    +	public void after() throws Exception {
    +		compareResultsByLinesInMemory(expected, resultPath);
    +		compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
    +	}
    +
    +	private PatternSelectFunction<Event, String> newIdSelectFunction(String ... names) {
    +		return new PatternSelectFunction<Event, String>() {
    +
    +			@Override
    +			public String select(Map<String, List<Event>> pattern) {
    +				StringBuilder builder = new StringBuilder();
    +				for (String name: names) {
    +					for (Event e : pattern.get(name)) {
    +						builder.append(e.getId()).append(",");
    +					}
    +				}
    +				return builder.toString();
    +			}
    +		};
    +	}
    +
    +	@Test
    +	public void testSkipToNext() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n2,3,4,\n3,4,5,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipPastLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToFirst() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(
    +				AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "end"))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n3,4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n4,5,6,7,";
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    --- End diff --
    
    Those tests should only test `NFACompiler` and thus be in `NFACompilerTest`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    @dawidwys Thanks for the reviewing. 
    
    Problem 1 is easy to fix, we can just start a new match process if the only left computation state reaches stopState.
    Problem 2 can not be avoided with current approach.  It's impossible to know whether there are potential matches. 
    
    I think the best wary to implement this correctly is try to start a new match process after processing each event, and discard unfinished match process after a successful match according to the skip strategy. In order to do that, we need to keep the logical order of the events, which is the original idea I proposed. 
    
    As for your general notes, I have some ideas:
    
    1. I agree that the Oracle's specification is designed for bounded data. But match recoginize in unbounded data is very similar to bounded data, since all data are being processed one by one, and there's no need for bound information. As for **_empty match_** , I think we can just use Oracle's definition.
    > Some patterns permit empty matches. For example:
    PATTERN (A*)
    can be matched by zero or more rows that are mapped to A.
    An empty match does not map any rows to primary row pattern variables; nevertheless, an empty match has a starting row. For example, there can be an empty match at the first row of a row pattern partition, an empty match at the second row of a row pattern partition, etc. An empty match is assigned a sequential match number, based on the ordinal position of its starting row, the same as any other match.
    
    2. I feel uncomfortable with the RuntimeExceptions too. But these exceptions are very important to keep the skip semantics right. I understand your main concern is that Exceptions will stop the matching process, which is unacceptable to online streaming service. To address this, I think we can introduce a default strategy(SKIP_TO_NEXT_EVENT, for example). If these exceptions happens, we can use default strategy to continue the match process, and change the strategy back after a successful match. We can also add a switch to let user decide whether to enable this feature.
    
    3. I still think it's useful to support these skip strategies. Don't know why Esper does not support them.
    
    4. Thanks for the related information. I took a brief look at the PR, which is very similar to this PR. I  wonder why it is closed without merging into the master code?
    
    Looking forward to your feedbacks. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132895649
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>.
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    +
    +	// pattern name to skip to
    +	String patternName = null;
    +
    --- End diff --
    
    This can be `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    Hi @yestinchen 
    
    Thanks for your feedback. 
    
    Right now, I just want to address the empty match issue. It is not easy to apply that definition to unbounded data. In the SQL specification at the end of partition we can deterministically decide if a partial match is empty or not. It is not the case in unbounded data, as future arriving events may make the partial matches empty. 
    
    Those are the nuances I think should be addressed first, but I agree there are many similarities.
    
    I will try to address rest later during the day, but I think we should continue in the JIRA.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r134170320
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -707,7 +789,7 @@ private boolean checkFilterCondition(ComputationState<T> computationState, Itera
     				result.put(key, values);
     			}
     
    -			for (T event: events) {
    +			for (T event : events) {
    --- End diff --
    
    Unrelated change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r134170202
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -695,10 +778,9 @@ private boolean checkFilterCondition(ComputationState<T> computationState, Itera
     		}
     		// for a given computation state, we cannot have more than one matching patterns.
     		Preconditions.checkState(paths.size() == 1);
    -
    -		Map<String, List<T>> result = new HashMap<>();
    +		Map<String, List<T>> result = new LinkedHashMap<>();
     		Map<String, List<T>> path = paths.get(0);
    -		for (String key: path.keySet()) {
    +		for (String key : path.keySet()) {
    --- End diff --
    
    Unrelated change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    @dawidwys Thanks for the reviews and comments, please change the documentation during merge.
    And @kl0u , thanks for the reviews!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r130317411
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_EVENT:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, event));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getPatternName()) &&
    +								!nextState.getName().equals(currentState.getName())) {
    +								ComputationState<T> startComputationState = createStartComputationState(computationState, event);
    +								if (callLevel > 0) {
    --- End diff --
    
    Because we need to detect whether there is an infinite loop. I use the callLevel to track it here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r130265354
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_EVENT:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, event));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getPatternName()) &&
    +								!nextState.getName().equals(currentState.getName())) {
    +								ComputationState<T> startComputationState = createStartComputationState(computationState, event);
    +								if (callLevel > 0) {
    --- End diff --
    
    Why need the callLevel?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r130317270
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_EVENT:
    +							if (nextState.isFinal()) {
    --- End diff --
    
    Thanks for pointing it out.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132895977
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>.
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    +
    +	// pattern name to skip to
    +	String patternName = null;
    +
    +	/**
    +	 * Skip to first *pattern*.
    +	 * @param patternName the pattern name to skip to
    +	 * @return
    +	 */
    +	public static AfterMatchSkipStrategy skipToFirst(String patternName) {
    +		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_FIRST, patternName);
    +	}
    +
    +	/**
    +	 * Skip to last *pattern*.
    +	 * @param patternName the pattern name to skip to
    +	 * @return
    +	 */
    +	public static AfterMatchSkipStrategy skipToLast(String patternName) {
    +		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_LAST, patternName);
    +	}
    +
    +	/**
    +	 * Skip past last event.
    +	 * @return
    +	 */
    +	public static AfterMatchSkipStrategy skipPastLastEvent() {
    +		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_PAST_LAST_EVENT);
    +	}
    +
    +	/**
    +	 * Skip to next event.
    +	 * @return
    +	 */
    +	public static AfterMatchSkipStrategy skipToNextEvent() {
    +		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_NEXT_EVENT);
    +	}
    +
    +	private AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	private AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) {
    +		if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
    +			throw new IllegalArgumentException("The patternName field can not be empty when SkipStrategy is " + strategy);
    +		}
    +		this.strategy = strategy;
    +		this.patternName = patternName;
    +	}
    +
    +	public SkipStrategy getStrategy() {
    +		return strategy;
    +	}
    +
    +	public String getPatternName() {
    +		return patternName;
    +	}
    +
    +	@Override
    +	public String toString() {
    --- End diff --
    
    If `patternName == null` then there is nothing to print, so it would be nice to adjust the `toString()` accordingly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132134129
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompatibilityUtil;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    +
    +	// fields
    +	String patternName = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) {
    +		if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
    +			throw new IllegalArgumentException("the patternName field can not be empty when SkipStrategy is " + strategy);
    +		}
    +		this.strategy = strategy;
    +		this.patternName = patternName;
    +	}
    +
    +	public SkipStrategy getStrategy() {
    +		return strategy;
    +	}
    +
    +	public String getPatternName() {
    +		return patternName;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "AfterMatchStrategy{" +
    +			"strategy=" + strategy +
    +			", patternName=" + patternName +
    +			'}';
    +	}
    +
    +	/**
    +	 * Skip Strategy Enum.
    +	 */
    +	public enum SkipStrategy{
    +		SKIP_TO_NEXT_EVENT,
    +		SKIP_PAST_LAST_EVENT,
    +		SKIP_TO_FIRST,
    +		SKIP_TO_LAST
    +	}
    +
    +	/**
    +	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
    +	 */
    +	public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
    +
    +		private static final int VERSION = 1;
    +
    +		/**
    +		 * This empty constructor is required for deserializing the configuration.
    +		 */
    +		public AfterMatchSkipStrategyConfigSnapshot() {
    +		}
    +
    +		public AfterMatchSkipStrategyConfigSnapshot(
    +			TypeSerializer<SkipStrategy> enumSerializer,
    +			TypeSerializer<String> stringSerializer) {
    +
    +			super(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public int getVersion() {
    +			return VERSION;
    +		}
    +	}
    +
    +	/**
    +	 *  A {@link TypeSerializer} for the {@link AfterMatchSkipStrategy}.
    +	 */
    +	public static class AfterMatchSkipStrategySerializer extends TypeSerializer<AfterMatchSkipStrategy> {
    +
    +		private final TypeSerializer<SkipStrategy> enumSerializer;
    +		private final TypeSerializer<String> stringSerializer;
    +
    +		public AfterMatchSkipStrategySerializer(TypeSerializer<SkipStrategy> enumSerializer, TypeSerializer<String> stringSerializer) {
    +			this.enumSerializer = enumSerializer;
    +			this.stringSerializer = stringSerializer;
    +		}
    +
    +		@Override
    +		public boolean isImmutableType() {
    +			return false;
    +		}
    +
    +		@Override
    +		public TypeSerializer<AfterMatchSkipStrategy> duplicate() {
    +			return new AfterMatchSkipStrategySerializer(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy createInstance() {
    +			return new AfterMatchSkipStrategy();
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy copy(AfterMatchSkipStrategy from) {
    +			try {
    --- End diff --
    
    Why not just:
    
        new AfterMatchSkipStrategy(from.strategy, from.patternName)
    
    No need to serialize/deserialize.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132386948
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,59 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    +		private void checkPatternSkipStrategy() {
    +			AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy();
    +			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
    +				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
    +				Pattern<T, ?> pattern = currentPattern;
    +				while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
    +					pattern = pattern.getPrevious();
    +				}
    +				// pattern name match check.
    +				if (pattern == null) {
    +					throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " +
    +						"can not be found in the given Pattern");
    +				} else {
    +					// can not be used with optional states.
    +					if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
    +						throw new MalformedPatternException("the AfterMatchSkipStrategy "
    +							+ afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern");
    +					}
    +				}
    +
    +				// start position check.
    +				if (pattern.getPrevious() == null) {
    --- End diff --
    
    Great, I'll just remove all those optional state check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132896299
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---
    @@ -42,28 +43,29 @@
     	private static final long serialVersionUID = 5328573789532074581L;
     
     	public KeyedCEPPatternOperator(
    -			TypeSerializer<IN> inputSerializer,
    -			boolean isProcessingTime,
    -			TypeSerializer<KEY> keySerializer,
    -			NFACompiler.NFAFactory<IN> nfaFactory,
    -			boolean migratingFromOldKeyedOperator,
    -			EventComparator<IN> comparator) {
    +		TypeSerializer<IN> inputSerializer,
    --- End diff --
    
    Please revert the unrelated formatting changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132867483
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -340,6 +362,65 @@ public void resetNFAChanged() {
     		return Tuple2.of(result, timeoutResult);
     	}
     
    +	private void discardComputationStatesAccordingToStrategy(Queue<ComputationState<T>> computationStates,
    +		Collection<Map<String, List<T>>> matchedResult, AfterMatchSkipStrategy afterMatchSkipStrategy) {
    +		Set<T> discardEvents = new HashSet<>();
    +		switch(afterMatchSkipStrategy.getStrategy()) {
    +			case SKIP_TO_LAST:
    +				for (Map<String, List<T>> resultMap: matchedResult) {
    +					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
    +						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    +							discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1));
    +							break;
    +						} else {
    +							discardEvents.addAll(keyMatches.getValue());
    +						}
    +					}
    +				}
    +				break;
    +			case SKIP_TO_FIRST:
    +				for (Map<String, List<T>> resultMap: matchedResult) {
    +					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
    +						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    +							break;
    +						} else {
    +							discardEvents.addAll(keyMatches.getValue());
    +						}
    +					}
    +				}
    +				break;
    +			case SKIP_PAST_LAST_EVENT:
    +				for (Map<String, List<T>> resultMap: matchedResult) {
    +					for (List<T> eventList: resultMap.values()) {
    +						discardEvents.addAll(eventList);
    +					}
    +				}
    +				break;
    +		}
    +		if (!discardEvents.isEmpty()) {
    +			List<ComputationState<T>> discardStates = new ArrayList<>();
    +			for (ComputationState<T> computationState : computationStates) {
    +				Map<String, List<T>> partialMatch = extractCurrentMatches(computationState);
    +				for (List<T> list: partialMatch.values()) {
    +					for (T e: list) {
    +						if (discardEvents.contains(e)) {
    +							// discard the computation state.
    +							eventSharedBuffer.release(
    +								NFAStateNameHandler.getOriginalNameFromInternal(
    +									computationState.getState().getName()),
    +								computationState.getEvent(),
    +								computationState.getTimestamp(),
    +								computationState.getCounter()
    +							);
    +							discardStates.add(computationState);
    --- End diff --
    
    Should add **break;** after **discardStates.add(computationState);**, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r135465794
  
    --- Diff: flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala ---
    @@ -511,4 +530,5 @@ object Pattern {
         */
       def begin[T, F <: T](pattern: Pattern[T, F]): GroupPattern[T, F] =
    --- End diff --
    
    Missing version with `AfterMatchSkipStrategy`. I will change it during merge, if you are ok with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132139143
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -322,6 +336,64 @@ public void resetNFAChanged() {
     
     		}
     
    +		Set<T> discardEvents = new HashSet<>();
    +		switch(afterMatchSkipStrategy.getStrategy()) {
    +			case SKIP_TO_LAST:
    +				for (Map<String, List<T>> resultMap: result) {
    +					boolean matched = false;
    +					for (String key: resultMap.keySet()) {
    +						if (key.equals(afterMatchSkipStrategy.getPatternName())) {
    +							matched = true;
    +							discardEvents.addAll(resultMap.get(key).subList(0, resultMap.get(key).size() - 1));
    +						} else if (!matched) {
    +							discardEvents.addAll(resultMap.get(key));
    +						}
    +					}
    +				}
    +				break;
    +			case SKIP_TO_FIRST:
    +				for (Map<String, List<T>> resultMap: result) {
    --- End diff --
    
    similar as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132166853
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -322,6 +336,64 @@ public void resetNFAChanged() {
     
     		}
     
    +		Set<T> discardEvents = new HashSet<>();
    --- End diff --
    
    move the whole added block to a new method. The method `process` is already long.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132895280
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,29 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    --- End diff --
    
    Could this whole check go in the `Pattern` class? This will make the code clearer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r136169035
  
    --- Diff: docs/dev/libs/cep.md ---
    @@ -1250,6 +1250,104 @@ pattern.within(Time.seconds(10))
     
     </div>
     
    +### After Match Skip Strategy
    +
    +For a given pattern, there can be many successful matches as data stream flows. In order to control how to restart the match process after a successful match, we need to specify the skip strategy called `AfterMatchSkipStrategy`. There're four types of skip strategies, listed as follows:
    --- End diff --
    
    That sounds good to me, please change it during merge. Thanks a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129297040
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_ROW,
    + * SKIP_TO_NEXT_ROW,
    + * SKIP_TO_FIRST_<code>RPV</code> and
    + * SKIP_TO_LAST_<code>RPV</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
    +
    +	// fields
    +	String rpv = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
    +		if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) {
    +			if (rpv == null) {
    --- End diff --
    
    why not put it into previous if?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132384194
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,59 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    +		private void checkPatternSkipStrategy() {
    +			AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy();
    +			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
    +				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
    +				Pattern<T, ?> pattern = currentPattern;
    +				while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
    +					pattern = pattern.getPrevious();
    +				}
    +				// pattern name match check.
    +				if (pattern == null) {
    +					throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " +
    +						"can not be found in the given Pattern");
    +				} else {
    +					// can not be used with optional states.
    +					if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
    +						throw new MalformedPatternException("the AfterMatchSkipStrategy "
    +							+ afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern");
    +					}
    +				}
    +
    +				// start position check.
    +				if (pattern.getPrevious() == null) {
    --- End diff --
    
    I personally see no reason for a semantic with RuntimeException. I can't think of any use-case for it. Maybe let's finish this PR without the switch and exceptions and open a JIRA with the switch, ideally with some use-case's for that semantic, so we can further agree on that and see if anyone needs it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r130266394
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_EVENT:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, event));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getPatternName()) &&
    +								!nextState.getName().equals(currentState.getName())) {
    +								ComputationState<T> startComputationState = createStartComputationState(computationState, event);
    +								if (callLevel > 0) {
    +									throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query");
    +								}
    +								// feed current matched event to the state.
    +								Collection<ComputationState<T>> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++);
    +								resultingComputationStates.addAll(computationStates);
    +							} else if (previousState == null && currentState.getName().equals(skipStrategy.getPatternName())) {
    +								throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query");
    +							}
    +							break;
    +						case SKIP_TO_LAST:
    +							if (currentState.getName().equals(skipStrategy.getPatternName()) &&
    +								!nextState.getName().equals(currentState.getName())) {
    +								ComputationState<T> startComputationState = createStartComputationState(computationState, event);
    +								if (callLevel > 0) {
    +									throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query");
    +								}
    +								// feed current matched event to the state.
    +								Collection<ComputationState<T>> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++);
    +								resultingComputationStates.addAll(computationStates);
    +							}
    +							break;
    +					}
     					break;
     			}
     		}
     
    -		if (computationState.isStartState()) {
    -			int totalBranches = calculateIncreasingSelfState(
    -					outgoingEdges.getTotalIgnoreBranches(),
    -					outgoingEdges.getTotalTakeBranches());
    -
    -			DeweyNumber startVersion = computationState.getVersion().increase(totalBranches);
    -			ComputationState<T> startState = ComputationState.createStartState(this, computationState.getState(), startVersion);
    -			resultingComputationStates.add(startState);
    +		if (computationState.isStartState() &&
    +			skipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT) {
    --- End diff --
    
    If the skip strategy is  @**SKIP_PAST_LAST_EVENT**, for pattern **a nextby b** and inputs **a1 c a2 b2**, the output should be **a2 b2**, but per the current implementation, I'm afraid there will be no output as after **a1** is processed, the start ComputationState will be consumed, but there is no chance to add the start ComputationState later and so **a2 b2** can not be matched. Right? For other skip strategy, I'm afraid there may be also such issues.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r134170147
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -990,7 +1072,7 @@ public void serialize(NFA<T> record, DataOutputView target) throws IOException {
     			boolean handleTimeout = source.readBoolean();
     
     			NFA<T> nfa = new NFA<>(eventSerializer, windowTime, handleTimeout);
    -			nfa.states = states;
    +			nfa.addStates(states);
    --- End diff --
    
    This change can result in multiple start states being added. Anyway this is unrelated change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132132564
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompatibilityUtil;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    +
    +	// fields
    +	String patternName = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) {
    --- End diff --
    
    I would change ctor's into factory methods. I think that way the requirements for `patternName` will be more visible and the the code would be less bloated.
    
    I have sth like this in mind:
    
    	public static AfterMatchSkipStrategy skipToFirst(String patternName) {
    		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_FIRST, patternName);
    	}
    
    	public static AfterMatchSkipStrategy skipToLast(String patternName) {
    		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_LAST, patternName);
    	}
    
    	public static AfterMatchSkipStrategy skipPastLastEvent() {
    		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_PAST_LAST_EVENT);
    	}
    
    	public static AfterMatchSkipStrategy skipToNextEvent() {
    		return new AfterMatchSkipStrategy(SkipStrategy.SKIP_TO_NEXT_EVENT);
    	}
    
    Then the usage will be:
    
        AfterMatchSkipStrategy.skipToLast("end")
    
    instead of:
    
        new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end")



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r133478539
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -340,6 +362,65 @@ public void resetNFAChanged() {
     		return Tuple2.of(result, timeoutResult);
     	}
     
    +	private void discardComputationStatesAccordingToStrategy(Queue<ComputationState<T>> computationStates,
    --- End diff --
    
    You are absolutely right. Thanks for the tip.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132895143
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,29 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    +		private void checkPatternSkipStrategy() {
    +			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
    +				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
    +				Pattern<T, ?> pattern = currentPattern;
    +				while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
    +					if (pattern.getPrevious() == null) {
    +						break;
    +					} else {
    +						pattern = pattern.getPrevious();
    +					}
    +				}
    +
    --- End diff --
    
    The above can become:
    ```
    while (pattern.getPrevious() != null && !pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
    		pattern = pattern.getPrevious();
    }
    ```
    right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    Hi @dawidwys , sorry for the late response. 
    Thanks for your reviews, I have updated the test and the document. Please take a look if you have time. Thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132175418
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep;
    +
    +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
    +import org.apache.flink.cep.pattern.MalformedPatternException;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * After match skip tests.
    + */
    +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase {
    +
    +	private String resultPath;
    +	private String expected;
    +
    +	private String lateEventPath;
    +	private String expectedLateEvents;
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	@Before
    +	public void before() throws Exception {
    +		resultPath = tempFolder.newFile().toURI().toString();
    +		expected = "";
    +
    +		lateEventPath = tempFolder.newFile().toURI().toString();
    +		expectedLateEvents = "";
    +	}
    +
    +	@After
    +	public void after() throws Exception {
    +		compareResultsByLinesInMemory(expected, resultPath);
    +		compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
    +	}
    +
    +	private PatternSelectFunction<Event, String> newIdSelectFunction(String ... names) {
    +		return new PatternSelectFunction<Event, String>() {
    +
    +			@Override
    +			public String select(Map<String, List<Event>> pattern) {
    +				StringBuilder builder = new StringBuilder();
    +				for (String name: names) {
    +					for (Event e : pattern.get(name)) {
    +						builder.append(e.getId()).append(",");
    +					}
    +				}
    +				return builder.toString();
    +			}
    +		};
    +	}
    +
    +	@Test
    +	public void testSkipToNext() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n2,3,4,\n3,4,5,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipPastLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToFirst() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(
    +				AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "end"))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n3,4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n4,5,6,7,";
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToLastWithEmptyException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "middle")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("d");
    +				}
    +			}
    +		).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToLastWithInfiniteLoopException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "start")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("d");
    +				}
    +			}
    +		).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToFirstWithInfiniteLoopException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "middle")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("x");
    +			}
    +		}).oneOrMore().optional().next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("b");
    +				}
    +			}
    +		).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("middle", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipPastLast2() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a1", 0.0),
    +			new Event(2, "a2", 0.0),
    +			new Event(3, "b1", 0.0),
    +			new Event(4, "b2", 0.0),
    +			new Event(5, "c1", 0.0),
    +			new Event(6, "c2", 0.0),
    +			new Event(7, "d1", 0.0),
    +			new Event(8, "d2", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("a", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).followedByAny("b").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("b");
    +				}
    +			}
    +		).followedByAny("c").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		})
    +			.followedByAny("d").where(new SimpleCondition<Event>() {
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("d");
    +				}
    +			});
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("a", "b", "c", "d"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,3,5,7,\n1,3,6,7,\n1,4,5,7,\n1,4,6,7,\n2,3,5,7,\n2,3,6,7,\n2,4,5,7,\n2,4,6,7,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipPastLast3() throws Exception {
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132669361
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -859,7 +940,7 @@ public boolean apply(@Nullable State<T> input) {
     	 */
     	public static final class NFASerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
    --- End diff --
    
    Revert the changes in serializer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129516650
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java ---
    @@ -36,6 +40,14 @@
     	 * @return Resulting pattern stream
     	 */
     	public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) {
    -		return new PatternStream<>(input, pattern);
    +		return new PatternStream<>(input, pattern, skipStrategy);
    +	}
    +
    +	/**
    +	 * Set the pattern's skip strategy after match.
    +	 * @param afterMatchSkipStrategy the skip strategy to use.
    +	 */
    +	public static void setAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) {
    --- End diff --
    
    Changed that into `Pattern`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132139074
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -322,6 +336,64 @@ public void resetNFAChanged() {
     
     		}
     
    +		Set<T> discardEvents = new HashSet<>();
    +		switch(afterMatchSkipStrategy.getStrategy()) {
    +			case SKIP_TO_LAST:
    +				for (Map<String, List<T>> resultMap: result) {
    +					boolean matched = false;
    --- End diff --
    
    How about:
    
        for (Map<String, List<T>> resultMap: result) {
            for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
                if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
                    discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1));
                    break;
                } else {
                    discardEvents.addAll(keyMatches.getValue());
                }
            }
        }
    
    This way we will stop whenever we reach the matching pattern.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    I think you can merge it @dawidwys . 
    I was following the evolution of this PR and I think it looks good ;) . 
    Thanks for the work both @yestinchen and @dawidwys !


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132137487
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -157,15 +157,28 @@
     	 */
     	private boolean nfaChanged;
     
    +	/**
    +	 * Store the skip strategy.
    +	 */
    +	private AfterMatchSkipStrategy afterMatchSkipStrategy;
    +
    +	public NFA(
    --- End diff --
    
    Don't see a point for creating another ctor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132349108
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompatibilityUtil;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    +
    +	// fields
    +	String patternName = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) {
    --- End diff --
    
    Good idea, I like that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r133478273
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,29 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    --- End diff --
    
    We only need to check the skip strategy before compile the `Pattern` to `NFA`, I think it's more reasonable to place it here. Also, we need to check whether the `patternName` field in the `AfterMatchSkipStrategy` is a valid reference, which can not be done easily in `Pattern` class.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132136797
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompatibilityUtil;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    +
    +	// fields
    +	String patternName = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) {
    +		if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
    +			throw new IllegalArgumentException("the patternName field can not be empty when SkipStrategy is " + strategy);
    +		}
    +		this.strategy = strategy;
    +		this.patternName = patternName;
    +	}
    +
    +	public SkipStrategy getStrategy() {
    +		return strategy;
    +	}
    +
    +	public String getPatternName() {
    +		return patternName;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "AfterMatchStrategy{" +
    +			"strategy=" + strategy +
    +			", patternName=" + patternName +
    +			'}';
    +	}
    +
    +	/**
    +	 * Skip Strategy Enum.
    +	 */
    +	public enum SkipStrategy{
    +		SKIP_TO_NEXT_EVENT,
    +		SKIP_PAST_LAST_EVENT,
    +		SKIP_TO_FIRST,
    +		SKIP_TO_LAST
    +	}
    +
    +	/**
    +	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
    +	 */
    +	public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
    +
    +		private static final int VERSION = 1;
    +
    +		/**
    +		 * This empty constructor is required for deserializing the configuration.
    +		 */
    +		public AfterMatchSkipStrategyConfigSnapshot() {
    +		}
    +
    +		public AfterMatchSkipStrategyConfigSnapshot(
    +			TypeSerializer<SkipStrategy> enumSerializer,
    +			TypeSerializer<String> stringSerializer) {
    +
    +			super(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public int getVersion() {
    +			return VERSION;
    +		}
    +	}
    +
    +	/**
    +	 *  A {@link TypeSerializer} for the {@link AfterMatchSkipStrategy}.
    +	 */
    +	public static class AfterMatchSkipStrategySerializer extends TypeSerializer<AfterMatchSkipStrategy> {
    +
    +		private final TypeSerializer<SkipStrategy> enumSerializer;
    +		private final TypeSerializer<String> stringSerializer;
    +
    +		public AfterMatchSkipStrategySerializer(TypeSerializer<SkipStrategy> enumSerializer, TypeSerializer<String> stringSerializer) {
    +			this.enumSerializer = enumSerializer;
    +			this.stringSerializer = stringSerializer;
    +		}
    +
    +		@Override
    +		public boolean isImmutableType() {
    +			return false;
    +		}
    +
    +		@Override
    +		public TypeSerializer<AfterMatchSkipStrategy> duplicate() {
    +			return new AfterMatchSkipStrategySerializer(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy createInstance() {
    +			return new AfterMatchSkipStrategy();
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy copy(AfterMatchSkipStrategy from) {
    +			try {
    +				ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +				ObjectOutputStream oos = new ObjectOutputStream(baos);
    +
    +				serialize(from, new DataOutputViewStreamWrapper(oos));
    +
    +				oos.close();
    +				baos.close();
    +
    +				byte[] data = baos.toByteArray();
    +
    +				ByteArrayInputStream bais = new ByteArrayInputStream(data);
    +				ObjectInputStream ois = new ObjectInputStream(bais);
    +
    +				AfterMatchSkipStrategy copy = deserialize(new DataInputViewStreamWrapper(ois));
    +				ois.close();
    +				bais.close();
    +
    +				return copy;
    +			} catch (IOException e) {
    +				throw new RuntimeException("Could not copy AfterMatchSkipStrategy.", e);
    +			}
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy copy(AfterMatchSkipStrategy from, AfterMatchSkipStrategy reuse) {
    +			return copy(from);
    +		}
    +
    +		@Override
    +		public int getLength() {
    +			return -1;
    +		}
    +
    +		@Override
    +		public void serialize(AfterMatchSkipStrategy record, DataOutputView target) throws IOException {
    +			enumSerializer.serialize(record.getStrategy(), target);
    +			stringSerializer.serialize(record.getPatternName(), target);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy deserialize(DataInputView source) throws IOException {
    +			SkipStrategy skipStrategy = enumSerializer.deserialize(source);
    +			String rpv = stringSerializer.deserialize(source);
    +			return new AfterMatchSkipStrategy(skipStrategy, rpv);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy deserialize(AfterMatchSkipStrategy reuse, DataInputView source) throws IOException {
    +			return deserialize(source);
    +		}
    +
    +		@Override
    +		public void copy(DataInputView source, DataOutputView target) throws IOException {
    +			SkipStrategy skipStrategy = enumSerializer.deserialize(source);
    +			enumSerializer.serialize(skipStrategy, target);
    +			String rpv = stringSerializer.deserialize(source);
    --- End diff --
    
    `rpv` -> `patternName`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132176196
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,276 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompatibilityUtil;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +import java.util.List;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    +
    +	// fields
    +	String patternName = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_EVENT, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String patternName) {
    +		if (patternName == null && (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST)) {
    +			throw new IllegalArgumentException("the patternName field can not be empty when SkipStrategy is " + strategy);
    +		}
    +		this.strategy = strategy;
    +		this.patternName = patternName;
    +	}
    +
    +	public SkipStrategy getStrategy() {
    +		return strategy;
    +	}
    +
    +	public String getPatternName() {
    +		return patternName;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "AfterMatchStrategy{" +
    +			"strategy=" + strategy +
    +			", patternName=" + patternName +
    +			'}';
    +	}
    +
    +	/**
    +	 * Skip Strategy Enum.
    +	 */
    +	public enum SkipStrategy{
    +		SKIP_TO_NEXT_EVENT,
    +		SKIP_PAST_LAST_EVENT,
    +		SKIP_TO_FIRST,
    +		SKIP_TO_LAST
    +	}
    +
    +	/**
    +	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
    +	 */
    +	public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
    +
    +		private static final int VERSION = 1;
    +
    +		/**
    +		 * This empty constructor is required for deserializing the configuration.
    +		 */
    +		public AfterMatchSkipStrategyConfigSnapshot() {
    +		}
    +
    +		public AfterMatchSkipStrategyConfigSnapshot(
    +			TypeSerializer<SkipStrategy> enumSerializer,
    +			TypeSerializer<String> stringSerializer) {
    +
    +			super(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public int getVersion() {
    +			return VERSION;
    +		}
    +	}
    +
    +	/**
    +	 *  A {@link TypeSerializer} for the {@link AfterMatchSkipStrategy}.
    +	 */
    +	public static class AfterMatchSkipStrategySerializer extends TypeSerializer<AfterMatchSkipStrategy> {
    --- End diff --
    
    If we remove `AfterMatchSkipStrategy` from `NFA` I think a custom `TypeSerializer` will be unnecessary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129518272
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_ROW:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getRpv()) &&
    +								!nextState.getName().equals(currentState.getName())) {
    +								ComputationState<T> startComputationState = createStartComputationState(computationState, outgoingEdges);
    --- End diff --
    
    Now I keep `startComputationState` instead of `startState` in NFA, so it can calculate the `outgoingEdges` from the start state when needed. Is this right? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    @dawidwys @dianfu I've updated the approach according to the document. Feel free to comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132897107
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -158,9 +158,9 @@
     	private boolean nfaChanged;
     
     	public NFA(
    -			final TypeSerializer<T> eventSerializer,
    -			final long windowTime,
    -			final boolean handleTimeout) {
    +		final TypeSerializer<T> eventSerializer,
    --- End diff --
    
    Please revert the unrelated formatting change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r130317122
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_EVENT:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, event));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getPatternName()) &&
    --- End diff --
    
    Yes, You are right.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132166483
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -157,15 +157,28 @@
     	 */
     	private boolean nfaChanged;
     
    +	/**
    +	 * Store the skip strategy.
    +	 */
    +	private AfterMatchSkipStrategy afterMatchSkipStrategy;
    --- End diff --
    
    I would not add it to NFA. I think it is a static data, that should not be serialized each time with `NFA`. We already have problems to remove `State`s, `Condition`s and `handleTimeout` from serializing within `NFA`. It also makes maintaining serializing compatbility hard. How about adding it to the `NFAFactory` and passing it to `NFA#process`(It is the only place where it is needed).
    
    I know it will require passing it in multiple places e.g. `KeyedCEPPatternOperator` and  'TimeoutKeyedCEPPatternOperator` but it will be easier after #4320.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r133091608
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -340,6 +362,65 @@ public void resetNFAChanged() {
     		return Tuple2.of(result, timeoutResult);
     	}
     
    +	private void discardComputationStatesAccordingToStrategy(Queue<ComputationState<T>> computationStates,
    +		Collection<Map<String, List<T>>> matchedResult, AfterMatchSkipStrategy afterMatchSkipStrategy) {
    +		Set<T> discardEvents = new HashSet<>();
    +		switch(afterMatchSkipStrategy.getStrategy()) {
    +			case SKIP_TO_LAST:
    +				for (Map<String, List<T>> resultMap: matchedResult) {
    +					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
    +						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    +							discardEvents.addAll(keyMatches.getValue().subList(0, keyMatches.getValue().size() - 1));
    +							break;
    +						} else {
    +							discardEvents.addAll(keyMatches.getValue());
    +						}
    +					}
    +				}
    +				break;
    +			case SKIP_TO_FIRST:
    +				for (Map<String, List<T>> resultMap: matchedResult) {
    +					for (Map.Entry<String, List<T>> keyMatches : resultMap.entrySet()) {
    +						if (keyMatches.getKey().equals(afterMatchSkipStrategy.getPatternName())) {
    +							break;
    +						} else {
    +							discardEvents.addAll(keyMatches.getValue());
    +						}
    +					}
    +				}
    +				break;
    +			case SKIP_PAST_LAST_EVENT:
    +				for (Map<String, List<T>> resultMap: matchedResult) {
    +					for (List<T> eventList: resultMap.values()) {
    +						discardEvents.addAll(eventList);
    +					}
    +				}
    +				break;
    +		}
    +		if (!discardEvents.isEmpty()) {
    +			List<ComputationState<T>> discardStates = new ArrayList<>();
    +			for (ComputationState<T> computationState : computationStates) {
    +				Map<String, List<T>> partialMatch = extractCurrentMatches(computationState);
    +				for (List<T> list: partialMatch.values()) {
    +					for (T e: list) {
    +						if (discardEvents.contains(e)) {
    +							// discard the computation state.
    +							eventSharedBuffer.release(
    +								NFAStateNameHandler.getOriginalNameFromInternal(
    +									computationState.getState().getName()),
    +								computationState.getEvent(),
    +								computationState.getTimestamp(),
    +								computationState.getCounter()
    +							);
    +							discardStates.add(computationState);
    --- End diff --
    
    Yes, you are right. Thanks for pointing it out!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132896369
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java ---
    @@ -42,20 +43,21 @@
     	private static final long serialVersionUID = 3570542177814518158L;
     
     	public TimeoutKeyedCEPPatternOperator(
    -			TypeSerializer<IN> inputSerializer,
    -			boolean isProcessingTime,
    -			TypeSerializer<KEY> keySerializer,
    -			NFACompiler.NFAFactory<IN> nfaFactory,
    -			boolean migratingFromOldKeyedOperator,
    -			EventComparator<IN> comparator) {
    +		TypeSerializer<IN> inputSerializer,
    --- End diff --
    
    Please revert the unrelated formatting changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r130264164
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_EVENT:
    +							if (nextState.isFinal()) {
    --- End diff --
    
    Should also consider the situation **Proceed to Final state**.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129298052
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_ROW,
    + * SKIP_TO_NEXT_ROW,
    + * SKIP_TO_FIRST_<code>RPV</code> and
    + * SKIP_TO_LAST_<code>RPV</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
    +
    +	// fields
    +	String rpv = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
    +		if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) {
    +			if (rpv == null) {
    +				throw new IllegalArgumentException("the rpv field can not be empty when SkipStrategy is " + strategy);
    +			}
    +		}
    +		this.strategy = strategy;
    +		this.rpv = rpv;
    +	}
    +
    +	public SkipStrategy getStrategy() {
    +		return strategy;
    +	}
    +
    +	public String getRpv() {
    +		return rpv;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "AfterMatchStrategy{" +
    +			"strategy=" + strategy +
    +			", rpv=" + rpv +
    +			'}';
    +	}
    +
    +	/**
    +	 * Skip Strategy Enum.
    +	 */
    +	public enum SkipStrategy{
    +		SKIP_TO_NEXT_ROW,
    +		SKIP_PAST_LAST_ROW,
    +		SKIP_TO_FIRST,
    +		SKIP_TO_LAST
    +	}
    +
    +	/**
    +	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
    +	 */
    +	public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
    +
    +		private static final int VERSION = 1;
    +
    +		/**
    +		 * This empty constructor is required for deserializing the configuration.
    +		 */
    +		public AfterMatchSkipStrategyConfigSnapshot() {
    +		}
    +
    +		public AfterMatchSkipStrategyConfigSnapshot(
    +			TypeSerializer<SkipStrategy> enumSerializer,
    +			TypeSerializer<String> stringSerializer) {
    +
    +			super(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public int getVersion() {
    +			return VERSION;
    +		}
    +	}
    +
    +	/**
    +	 *  A {@link TypeSerializer} for the {@link AfterMatchSkipStrategy}.
    +	 */
    +	public static class AfterMatchSkipStrategySerializer extends TypeSerializer<AfterMatchSkipStrategy> {
    +
    +		private final TypeSerializer<SkipStrategy> enumSerializer;
    +		private final TypeSerializer<String> stringSerializer;
    +
    +		public AfterMatchSkipStrategySerializer(TypeSerializer<SkipStrategy> enumSerializer, TypeSerializer<String> stringSerializer) {
    +			this.enumSerializer = enumSerializer;
    +			this.stringSerializer = stringSerializer;
    +		}
    +
    +		@Override
    +		public boolean isImmutableType() {
    +			return false;
    +		}
    +
    +		@Override
    +		public TypeSerializer<AfterMatchSkipStrategy> duplicate() {
    +			return new AfterMatchSkipStrategySerializer(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy createInstance() {
    +			return new AfterMatchSkipStrategy();
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy copy(AfterMatchSkipStrategy from) {
    +			try {
    +				ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +				ObjectOutputStream oos = new ObjectOutputStream(baos);
    +
    +				serialize(from, new DataOutputViewStreamWrapper(oos));
    +
    +				oos.close();
    +				baos.close();
    +
    +				byte[] data = baos.toByteArray();
    +
    +				ByteArrayInputStream bais = new ByteArrayInputStream(data);
    +				ObjectInputStream ois = new ObjectInputStream(bais);
    +
    +				AfterMatchSkipStrategy copy = deserialize(new DataInputViewStreamWrapper(ois));
    +				ois.close();
    +				bais.close();
    +
    +				return copy;
    +			} catch (IOException e) {
    +				throw new RuntimeException("Could not copy AfterMatchSkipStrategy.", e);
    +			}
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy copy(AfterMatchSkipStrategy from, AfterMatchSkipStrategy reuse) {
    +			return copy(from);
    +		}
    +
    +		@Override
    +		public int getLength() {
    +			return -1;
    +		}
    +
    +		@Override
    +		public void serialize(AfterMatchSkipStrategy record, DataOutputView target) throws IOException {
    +			enumSerializer.serialize(record.getStrategy(), target);
    +			stringSerializer.serialize(record.getRpv(), target);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy deserialize(DataInputView source) throws IOException {
    +			SkipStrategy skipStrategy = enumSerializer.deserialize(source);
    +			String rpv = stringSerializer.deserialize(source);
    +			return new AfterMatchSkipStrategy(skipStrategy, rpv);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy deserialize(AfterMatchSkipStrategy reuse, DataInputView source) throws IOException {
    +			return deserialize(source);
    +		}
    +
    +		@Override
    +		public void copy(DataInputView source, DataOutputView target) throws IOException {
    +			SkipStrategy skipStrategy = enumSerializer.deserialize(source);
    +			enumSerializer.serialize(skipStrategy, target);
    +			String rpv = stringSerializer.deserialize(source);
    +			stringSerializer.serialize(rpv, target);
    +		}
    +
    +		@Override
    +		public boolean equals(Object obj) {
    +			return obj == this ||
    +				(obj != null && obj.getClass().equals(getClass()));
    +		}
    +
    +		@Override
    +		public boolean canEqual(Object obj) {
    +			return true;
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return 37 * enumSerializer.hashCode() + stringSerializer.hashCode();
    +		}
    +
    +		@Override
    +		public TypeSerializerConfigSnapshot snapshotConfiguration() {
    +			return new AfterMatchSkipStrategyConfigSnapshot(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public CompatibilityResult<AfterMatchSkipStrategy> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
    +			return CompatibilityResult.compatible();
    --- End diff --
    
    This method should check the internal serializers compatibility.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129303933
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_ROW:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getRpv()) &&
    +								!nextState.getName().equals(currentState.getName())) {
    +								ComputationState<T> startComputationState = createStartComputationState(computationState, outgoingEdges);
    +								if (callLevel > 0) {
    +									throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query");
    +								}
    +								// feed current matched event to the state.
    +								Collection<ComputationState<T>> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++);
    --- End diff --
    
    Why do we feed the event?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132895639
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>.
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    --- End diff --
    
    This can be `private`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132896183
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---
    @@ -120,13 +121,16 @@
     
     	private final EventComparator<IN> comparator;
     
    +	final AfterMatchSkipStrategy afterMatchSkipStrategy;
    --- End diff --
    
    This can be `protected`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    Hi @yestinchen ,
    Thanks for the update.
    
    After second round of review. I found many problems with current approach. It returns only the first match in a stream in most cases.
    
    1. Let's analyze a pattern `A B C` with `SKIP_TO_FIRST C` and a sequence `a1 b1 c1 a2 b2 c2`. It will return only `a1 b1 c1` and will left the NFA without any valid `ComputationalStates` which results in stopping processing.
    
    2. Another problem is we do not handle a matches that can potentially finish before previously started. E.g. for Pattern 
    
    ```
    Pattern<Event, ?> pattern = Pattern.<Event>begin("ab").where(new SimpleCondition<Event>() {
    	@Override
    	public boolean filter(Event value) throws Exception {
    		return value.getName().equals("a") || value.getName().equals("b");
    	}
    }).followedBy("c").where(new IterativeCondition<Event>() {
    	@Override
    	public boolean filter(Event value, Context<Event> ctx) throws Exception {
    		return value.getName().equals("c") && ctx.getEventsForPattern("ab").iterator().next().getPrice() == value.getPrice();
    	}
    }).setAfterMatchSkipStrategy(new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT));
    ```
    
    and a sequence `a(price = 1) b(price = 2) c(price = 2)`. I think a desired behaviour would be to start new matching after `c` event, but it won't as the matching started at `a` and will not start at `b`.
    
    ---
    
    Some general notes:
    
    1. I think the SQL's specification does not suits well into CEP's library as we do not operate on a partition/bounded collection of events. The specification on the other hand assumes such bounded data. I think we would benefit from some additional documentation how the AFTER_MATCH clause works in case of unbounded data. E.g. what does **_empty match_** mean:
    
    > Note that the AFTER MATCH SKIP syntax only determines the point to resume scanning for a match after a non-empty match. When an empty match is found, one row is skipped (as if SKIP TO NEXT ROW had been speci ed). Thus an empty match never causes one of these exceptions.
    
    etc.
    
    2. I really don't like the idea of so many cases when `RuntimeException` can be thrown. I feel the reason for using CEP is a constantly running jobs that search for patterns in a stream rather than ad-hoc queries. 
    E.g in case of a Pattern like `A B? C` with `SKIP_TO_LAST B` a sequence like `a c` results in an exception and the job being killed. In my opinion it does not suits well into constantly running job. From operational side running such Patterns would be at least interesting ;), as they depend so much on the arriving data.
    
    3. I don't know the reasoning, but Esper, that was mentioned as the other(besides Oracle) library that supports `MATCH_RECOGNIZE` clause does not support `AFTER MATCH` at all.
    
    4. I found out there was already an ongoing work to introduce part of the `AFTER MATCH` (the `SKIP_PAST_LAST`). The corresponding jira: https://issues.apache.org/jira/browse/FLINK-3703 and closed PR: #2367 .
    
    To sum up thanks @yestinchen for the work. Unfortunately I think the clause needs a little bit more conceptual discussion before we can introduce this change. I think the `SKIP_PAST_LAST` behaviour would be very helpful (in fact there were alread requests for it in the mailing list) and the most straight forward to implement. I would love to here your opinions @yestinchen as well as @kl0u and @dianfu.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132894559
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -96,7 +97,8 @@
     		} else {
     			final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
     			nfaFactoryCompiler.compileFactory();
    -			return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
    +			return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(),
    +				nfaFactoryCompiler.getStates(), timeoutHandling);
     		}
    --- End diff --
    
    Unrelated formatting change. Please revert.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    @dianfu Thanks for your reviewing. 
    I found @dawidwys wrote a draft about the JIRA's implementation. I'll go through that first and address those issues in this PR latter. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132348950
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,59 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    +		private void checkPatternSkipStrategy() {
    +			AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy();
    +			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
    +				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
    +				Pattern<T, ?> pattern = currentPattern;
    +				while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
    +					pattern = pattern.getPrevious();
    +				}
    +				// pattern name match check.
    +				if (pattern == null) {
    +					throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " +
    +						"can not be found in the given Pattern");
    +				} else {
    +					// can not be used with optional states.
    +					if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
    +						throw new MalformedPatternException("the AfterMatchSkipStrategy "
    +							+ afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern");
    +					}
    +				}
    +
    +				// start position check.
    +				if (pattern.getPrevious() == null) {
    --- End diff --
    
    I agree with you that the fallback approach is much easier to understand and maintain.
    If we discard nothing, the actual sematics is to use SKIP_TO_NEXT_EVENT for the next match process. But it will have an impact on matching sematics, which may lead to incorrect results. I think users should be aware of what happens. My original thought was to add a configuration switch, to let user choose between throwing exceptions and falling back to a default skip strategy.
    Do you have any ideas about that?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132895590
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>.
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    --- End diff --
    
    Please add a `serialVersionUID`, e.g.: 
    `private static final long serialVersionUID = -3601462998929198774L;`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132894611
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -117,9 +119,11 @@
     		private Map<GroupPattern<T, ?>, Boolean> firstOfLoopMap = new HashMap<>();
     		private Pattern<T, ?> currentPattern;
     		private Pattern<T, ?> followingPattern;
    +		private AfterMatchSkipStrategy afterMatchSkipStrategy;
    --- End diff --
    
    This can be `final`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129515022
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_ROW:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getRpv()) &&
    +								!nextState.getName().equals(currentState.getName())) {
    +								ComputationState<T> startComputationState = createStartComputationState(computationState, outgoingEdges);
    +								if (callLevel > 0) {
    +									throw new RuntimeException("infinite loop! Will abort the match process, please rewrite your pattern query");
    +								}
    +								// feed current matched event to the state.
    +								Collection<ComputationState<T>> computationStates = computeNextStates(startComputationState, event, timestamp, callLevel++);
    --- End diff --
    
    Because SKIP_TO_FIRST or SKIP_TO_LAST needs to start the next match process at the first or last matched event in specified pattern. For example, for a given event stream: `a1, b1, c1, a2` and a given match `(A B C)`. If we set the SkipStrategy to SKIP_TO_FIRST with a pattern name `B`, we should create a new `startComputationState` after `b1` is being processed. And the next match should start at event `b1`. So we need to manually feed `b1` to the newly created `startComputationState`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129517254
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_ROW,
    + * SKIP_TO_NEXT_ROW,
    + * SKIP_TO_FIRST_<code>RPV</code> and
    + * SKIP_TO_LAST_<code>RPV</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
    +
    +	// fields
    +	String rpv = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
    +		if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) {
    +			if (rpv == null) {
    +				throw new IllegalArgumentException("the rpv field can not be empty when SkipStrategy is " + strategy);
    +			}
    +		}
    +		this.strategy = strategy;
    +		this.rpv = rpv;
    +	}
    +
    +	public SkipStrategy getStrategy() {
    +		return strategy;
    +	}
    +
    +	public String getRpv() {
    +		return rpv;
    +	}
    +
    +	@Override
    +	public String toString() {
    +		return "AfterMatchStrategy{" +
    +			"strategy=" + strategy +
    +			", rpv=" + rpv +
    +			'}';
    +	}
    +
    +	/**
    +	 * Skip Strategy Enum.
    +	 */
    +	public enum SkipStrategy{
    +		SKIP_TO_NEXT_ROW,
    +		SKIP_PAST_LAST_ROW,
    +		SKIP_TO_FIRST,
    +		SKIP_TO_LAST
    +	}
    +
    +	/**
    +	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
    +	 */
    +	public static class AfterMatchSkipStrategyConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
    +
    +		private static final int VERSION = 1;
    +
    +		/**
    +		 * This empty constructor is required for deserializing the configuration.
    +		 */
    +		public AfterMatchSkipStrategyConfigSnapshot() {
    +		}
    +
    +		public AfterMatchSkipStrategyConfigSnapshot(
    +			TypeSerializer<SkipStrategy> enumSerializer,
    +			TypeSerializer<String> stringSerializer) {
    +
    +			super(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public int getVersion() {
    +			return VERSION;
    +		}
    +	}
    +
    +	/**
    +	 *  A {@link TypeSerializer} for the {@link AfterMatchSkipStrategy}.
    +	 */
    +	public static class AfterMatchSkipStrategySerializer extends TypeSerializer<AfterMatchSkipStrategy> {
    +
    +		private final TypeSerializer<SkipStrategy> enumSerializer;
    +		private final TypeSerializer<String> stringSerializer;
    +
    +		public AfterMatchSkipStrategySerializer(TypeSerializer<SkipStrategy> enumSerializer, TypeSerializer<String> stringSerializer) {
    +			this.enumSerializer = enumSerializer;
    +			this.stringSerializer = stringSerializer;
    +		}
    +
    +		@Override
    +		public boolean isImmutableType() {
    +			return false;
    +		}
    +
    +		@Override
    +		public TypeSerializer<AfterMatchSkipStrategy> duplicate() {
    +			return new AfterMatchSkipStrategySerializer(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy createInstance() {
    +			return new AfterMatchSkipStrategy();
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy copy(AfterMatchSkipStrategy from) {
    +			try {
    +				ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +				ObjectOutputStream oos = new ObjectOutputStream(baos);
    +
    +				serialize(from, new DataOutputViewStreamWrapper(oos));
    +
    +				oos.close();
    +				baos.close();
    +
    +				byte[] data = baos.toByteArray();
    +
    +				ByteArrayInputStream bais = new ByteArrayInputStream(data);
    +				ObjectInputStream ois = new ObjectInputStream(bais);
    +
    +				AfterMatchSkipStrategy copy = deserialize(new DataInputViewStreamWrapper(ois));
    +				ois.close();
    +				bais.close();
    +
    +				return copy;
    +			} catch (IOException e) {
    +				throw new RuntimeException("Could not copy AfterMatchSkipStrategy.", e);
    +			}
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy copy(AfterMatchSkipStrategy from, AfterMatchSkipStrategy reuse) {
    +			return copy(from);
    +		}
    +
    +		@Override
    +		public int getLength() {
    +			return -1;
    +		}
    +
    +		@Override
    +		public void serialize(AfterMatchSkipStrategy record, DataOutputView target) throws IOException {
    +			enumSerializer.serialize(record.getStrategy(), target);
    +			stringSerializer.serialize(record.getRpv(), target);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy deserialize(DataInputView source) throws IOException {
    +			SkipStrategy skipStrategy = enumSerializer.deserialize(source);
    +			String rpv = stringSerializer.deserialize(source);
    +			return new AfterMatchSkipStrategy(skipStrategy, rpv);
    +		}
    +
    +		@Override
    +		public AfterMatchSkipStrategy deserialize(AfterMatchSkipStrategy reuse, DataInputView source) throws IOException {
    +			return deserialize(source);
    +		}
    +
    +		@Override
    +		public void copy(DataInputView source, DataOutputView target) throws IOException {
    +			SkipStrategy skipStrategy = enumSerializer.deserialize(source);
    +			enumSerializer.serialize(skipStrategy, target);
    +			String rpv = stringSerializer.deserialize(source);
    +			stringSerializer.serialize(rpv, target);
    +		}
    +
    +		@Override
    +		public boolean equals(Object obj) {
    +			return obj == this ||
    +				(obj != null && obj.getClass().equals(getClass()));
    +		}
    +
    +		@Override
    +		public boolean canEqual(Object obj) {
    +			return true;
    +		}
    +
    +		@Override
    +		public int hashCode() {
    +			return 37 * enumSerializer.hashCode() + stringSerializer.hashCode();
    +		}
    +
    +		@Override
    +		public TypeSerializerConfigSnapshot snapshotConfiguration() {
    +			return new AfterMatchSkipStrategyConfigSnapshot(enumSerializer, stringSerializer);
    +		}
    +
    +		@Override
    +		public CompatibilityResult<AfterMatchSkipStrategy> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
    +			return CompatibilityResult.compatible();
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132174353
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep;
    +
    +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
    +import org.apache.flink.cep.pattern.MalformedPatternException;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * After match skip tests.
    + */
    +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase {
    --- End diff --
    
    Switch the test to be similar to `NFAITCase` to test smaller part of the library.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132895822
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,112 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_EVENT,
    + * SKIP_TO_NEXT_EVENT,
    + * SKIP_TO_FIRST_<code>PATTERN</code> and
    + * SKIP_TO_LAST_<code>PATTERN</code>.
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_EVENT;
    +
    +	// pattern name to skip to
    +	String patternName = null;
    +
    +	/**
    +	 * Skip to first *pattern*.
    +	 * @param patternName the pattern name to skip to
    +	 * @return
    --- End diff --
    
    The `@return` javadoc is missing on all of the methods. Also add a brief description of what each one implies.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132168886
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,59 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    +		private void checkPatternSkipStrategy() {
    +			AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy();
    --- End diff --
    
    Just use `this.afterMatchSkipStrategy`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    Thanks for your reviews @dawidwys ! I'll update the doc in the following commits.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r135465365
  
    --- Diff: docs/dev/libs/cep.md ---
    @@ -1250,6 +1250,104 @@ pattern.within(Time.seconds(10))
     
     </div>
     
    +### After Match Skip Strategy
    +
    +For a given pattern, there can be many successful matches as data stream flows. In order to control how to restart the match process after a successful match, we need to specify the skip strategy called `AfterMatchSkipStrategy`. There're four types of skip strategies, listed as follows:
    --- End diff --
    
    That is not true. We do not restart the match process at all. It just controls which results are discarded. I will change it during merge, if you are ok with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129303049
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_ROW:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, outgoingEdges));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getRpv()) &&
    +								!nextState.getName().equals(currentState.getName())) {
    +								ComputationState<T> startComputationState = createStartComputationState(computationState, outgoingEdges);
    --- End diff --
    
    The `outgoingEdges` parameter in this case will not work. Previously the assumption was that we created the new `start` state always in the starting state. So we could calculate the version for new run based on outgoing edges from the start state. 
    
    In this case the outgoingEdges do not correspond to the starting state.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by litrain <gi...@git.apache.org>.
Github user litrain commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    @dawidwys  @yestinchen Thanks for your discussion, I am also working on the empty match issue now.  Please have a look at https://issues.apache.org/jira/browse/FLINK-7292. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    Sorry for late response. I think this feature is very useful and agree that we should have a clear thought on what things should be for each skip strategy. I noticed that there are already some discussions in FLINK-3703 which we can refer. I will take a look at this PR and also FLINK-3703 these two days and will post my thought.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP function in CE...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4331
  
    Still missing some tests:
    
    - ensuring the `NFA#extractCurrentMatches` returns patterns in order.
    - skip to first/last with `oneOrMore`
    
    Docs missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132169303
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,59 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    +		private void checkPatternSkipStrategy() {
    +			AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy();
    +			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
    +				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
    +				Pattern<T, ?> pattern = currentPattern;
    +				while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
    +					pattern = pattern.getPrevious();
    +				}
    +				// pattern name match check.
    +				if (pattern == null) {
    --- End diff --
    
    It is always true. If `pattern.getPrevious` is `null`. The `while` loop will throw `NullPointerException`.
    
    Missing test for this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132175365
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep;
    +
    +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
    +import org.apache.flink.cep.pattern.MalformedPatternException;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * After match skip tests.
    + */
    +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase {
    +
    +	private String resultPath;
    +	private String expected;
    +
    +	private String lateEventPath;
    +	private String expectedLateEvents;
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	@Before
    +	public void before() throws Exception {
    +		resultPath = tempFolder.newFile().toURI().toString();
    +		expected = "";
    +
    +		lateEventPath = tempFolder.newFile().toURI().toString();
    +		expectedLateEvents = "";
    +	}
    +
    +	@After
    +	public void after() throws Exception {
    +		compareResultsByLinesInMemory(expected, resultPath);
    +		compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
    +	}
    +
    +	private PatternSelectFunction<Event, String> newIdSelectFunction(String ... names) {
    +		return new PatternSelectFunction<Event, String>() {
    +
    +			@Override
    +			public String select(Map<String, List<Event>> pattern) {
    +				StringBuilder builder = new StringBuilder();
    +				for (String name: names) {
    +					for (Event e : pattern.get(name)) {
    +						builder.append(e.getId()).append(",");
    +					}
    +				}
    +				return builder.toString();
    +			}
    +		};
    +	}
    +
    +	@Test
    +	public void testSkipToNext() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n2,3,4,\n3,4,5,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipPastLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToFirst() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(
    +				AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "end"))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n3,4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n4,5,6,7,";
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToLastWithEmptyException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "middle")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("d");
    +				}
    +			}
    +		).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToLastWithInfiniteLoopException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "start")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("d");
    +				}
    +			}
    +		).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToFirstWithInfiniteLoopException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "middle")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("x");
    +			}
    +		}).oneOrMore().optional().next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("b");
    +				}
    +			}
    +		).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("middle", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipPastLast2() throws Exception {
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129517161
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_ROW,
    + * SKIP_TO_NEXT_ROW,
    + * SKIP_TO_FIRST_<code>RPV</code> and
    + * SKIP_TO_LAST_<code>RPV</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
    +
    +	// fields
    +	String rpv = null;
    --- End diff --
    
    It means Row Pattern Variable. I already changed it to `patternName`, thought it would be better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129517227
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_ROW,
    + * SKIP_TO_NEXT_ROW,
    + * SKIP_TO_FIRST_<code>RPV</code> and
    + * SKIP_TO_LAST_<code>RPV</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
    +
    +	// fields
    +	String rpv = null;
    +
    +	public AfterMatchSkipStrategy(){
    +		this(SkipStrategy.SKIP_TO_NEXT_ROW, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy) {
    +		this(strategy, null);
    +	}
    +
    +	public AfterMatchSkipStrategy(SkipStrategy strategy, String rpv) {
    +		if (strategy == SkipStrategy.SKIP_TO_FIRST || strategy == SkipStrategy.SKIP_TO_LAST) {
    +			if (rpv == null) {
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r134170217
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -557,8 +640,8 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     
     		if (computationState.isStartState()) {
     			int totalBranches = calculateIncreasingSelfState(
    -					outgoingEdges.getTotalIgnoreBranches(),
    -					outgoingEdges.getTotalTakeBranches());
    +				outgoingEdges.getTotalIgnoreBranches(),
    --- End diff --
    
    Unrelated change


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129296835
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_ROW,
    + * SKIP_TO_NEXT_ROW,
    + * SKIP_TO_FIRST_<code>RPV</code> and
    + * SKIP_TO_LAST_<code>RPV</code>
    + * </p>
    + */
    +public class AfterMatchSkipStrategy implements Serializable {
    +
    +	// default strategy
    +	SkipStrategy strategy = SkipStrategy.SKIP_TO_NEXT_ROW;
    +
    +	// fields
    +	String rpv = null;
    --- End diff --
    
    What does `rpv` mean?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132896150
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---
    @@ -120,13 +121,16 @@
     
     	private final EventComparator<IN> comparator;
     
    +	final AfterMatchSkipStrategy afterMatchSkipStrategy;
    +
     	public AbstractKeyedCEPPatternOperator(
    -			final TypeSerializer<IN> inputSerializer,
    --- End diff --
    
    Please revert the formatting changes (tabs).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132173699
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -150,6 +160,59 @@ long getWindowTime() {
     		}
     
     		/**
    +		 * Check pattern after match skip strategy.
    +		 */
    +		private void checkPatternSkipStrategy() {
    +			AfterMatchSkipStrategy afterMatchSkipStrategy = currentPattern.getAfterMatchSkipStrategy();
    +			if (afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST ||
    +				afterMatchSkipStrategy.getStrategy() == AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST) {
    +				Pattern<T, ?> pattern = currentPattern;
    +				while (!pattern.getName().equals(afterMatchSkipStrategy.getPatternName())) {
    +					pattern = pattern.getPrevious();
    +				}
    +				// pattern name match check.
    +				if (pattern == null) {
    +					throw new MalformedPatternException("the pattern name specified in AfterMatchSkipStrategy " +
    +						"can not be found in the given Pattern");
    +				} else {
    +					// can not be used with optional states.
    +					if (pattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
    +						throw new MalformedPatternException("the AfterMatchSkipStrategy "
    +							+ afterMatchSkipStrategy.getStrategy() + " can not be used with optional pattern");
    +					}
    +				}
    +
    +				// start position check.
    +				if (pattern.getPrevious() == null) {
    --- End diff --
    
    Had a second thought on all those cases, and I think you had a good point with one of the previous proposals, just to fall back to the `SKIP_TO_NEXT_EVENT`
    
    Maybe let's allow all those situations, but if within a match we cannot discard(e.g. skip is to the first pattern or the optional event is not present) then we discard nothing. The advantage in my opinion it is easier to understand than to understand and maintain all those cases. (As a side note the exceptions in SQL in all those cases in my opinion are just an implementation details as they keep a single partial match at a time). What do you think?  


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4331


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132898814
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -340,6 +362,65 @@ public void resetNFAChanged() {
     		return Tuple2.of(result, timeoutResult);
     	}
     
    +	private void discardComputationStatesAccordingToStrategy(Queue<ComputationState<T>> computationStates,
    --- End diff --
    
    This method is very fragile as it makes the assumption that the different patterns appear in the match in the order that they were declared in the pattern declaration, e.g. for a pattern `A -> B -> C` it assumes that the partial matches will appear in the order [`matches for A`, `matches for B`, `matches for C`]. This is *NOT* correct as we use a `HashMap` which does not provide any ordering guarantees. 
    
    A quick fix would be to use a `LinkedHashMap` as the return type for the `extractCurrentMatches` and put the results there in the correct order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r130264936
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -551,18 +579,47 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
     								nextVersion,
     								startTimestamp);
     					}
    +
    +					switch (skipStrategy.getStrategy()) {
    +						case SKIP_PAST_LAST_EVENT:
    +							if (nextState.isFinal()) {
    +								resultingComputationStates.add(createStartComputationState(computationState, event));
    +							}
    +							break;
    +						case SKIP_TO_FIRST:
    +							if (nextState.getName().equals(skipStrategy.getPatternName()) &&
    --- End diff --
    
    Should use NFAStateNameHandler.getOriginalNameFromInternal() to compare state name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by yestinchen <gi...@git.apache.org>.
Github user yestinchen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129516786
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/AfterMatchSkipStrategy.java ---
    @@ -0,0 +1,241 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.api.common.typeutils.CompatibilityResult;
    +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
    +import org.apache.flink.api.common.typeutils.TypeSerializer;
    +import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
    +import org.apache.flink.core.memory.DataInputView;
    +import org.apache.flink.core.memory.DataInputViewStreamWrapper;
    +import org.apache.flink.core.memory.DataOutputView;
    +import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
    +
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
    +import java.io.IOException;
    +import java.io.ObjectInputStream;
    +import java.io.ObjectOutputStream;
    +import java.io.Serializable;
    +
    +
    +/**
    + * Indicate the skip strategy after a match process.
    + * <p>There're four kinds of strategies:
    + * SKIP_PAST_LAST_ROW,
    + * SKIP_TO_NEXT_ROW,
    --- End diff --
    
    I changed the `ROW` to `EVENT`, is it better ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r133667042
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -340,6 +362,65 @@ public void resetNFAChanged() {
     		return Tuple2.of(result, timeoutResult);
     	}
     
    +	private void discardComputationStatesAccordingToStrategy(Queue<ComputationState<T>> computationStates,
    --- End diff --
    
    The changes in the next commit are not enough. The `Map` type should be also changed in `SharedBuffer#extractPatterns`. Also please provide tests for this behaviour (returning results in Pattern order), so that it will not be possible to change it by mistake.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r129296465
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java ---
    @@ -36,6 +40,14 @@
     	 * @return Resulting pattern stream
     	 */
     	public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern) {
    -		return new PatternStream<>(input, pattern);
    +		return new PatternStream<>(input, pattern, skipStrategy);
    +	}
    +
    +	/**
    +	 * Set the pattern's skip strategy after match.
    +	 * @param afterMatchSkipStrategy the skip strategy to use.
    +	 */
    +	public static void setAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) {
    --- End diff --
    
    As @dianfu said in the corresponding JIRA, the `AfterMatchSkipStrategy` should be part of the `Pattern` not `PatternStream`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4331: [FLINK-7169][CEP] Support AFTER MATCH SKIP functio...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4331#discussion_r132175333
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/AfterMatchSkipITCase.java ---
    @@ -0,0 +1,431 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep;
    +
    +import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
    +import org.apache.flink.cep.pattern.MalformedPatternException;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.streaming.api.datastream.DataStream;
    +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
    +
    +import org.junit.After;
    +import org.junit.Before;
    +import org.junit.Rule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import java.util.List;
    +import java.util.Map;
    +
    +/**
    + * After match skip tests.
    + */
    +public class AfterMatchSkipITCase extends StreamingMultipleProgramsTestBase {
    +
    +	private String resultPath;
    +	private String expected;
    +
    +	private String lateEventPath;
    +	private String expectedLateEvents;
    +
    +	@Rule
    +	public TemporaryFolder tempFolder = new TemporaryFolder();
    +
    +	@Before
    +	public void before() throws Exception {
    +		resultPath = tempFolder.newFile().toURI().toString();
    +		expected = "";
    +
    +		lateEventPath = tempFolder.newFile().toURI().toString();
    +		expectedLateEvents = "";
    +	}
    +
    +	@After
    +	public void after() throws Exception {
    +		compareResultsByLinesInMemory(expected, resultPath);
    +		compareResultsByLinesInMemory(expectedLateEvents, lateEventPath);
    +	}
    +
    +	private PatternSelectFunction<Event, String> newIdSelectFunction(String ... names) {
    +		return new PatternSelectFunction<Event, String>() {
    +
    +			@Override
    +			public String select(Map<String, List<Event>> pattern) {
    +				StringBuilder builder = new StringBuilder();
    +				for (String name: names) {
    +					for (Event e : pattern.get(name)) {
    +						builder.append(e.getId()).append(",");
    +					}
    +				}
    +				return builder.toString();
    +			}
    +		};
    +	}
    +
    +	@Test
    +	public void testSkipToNext() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_NEXT_EVENT))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n2,3,4,\n3,4,5,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipPastLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "a", 0.0),
    +			new Event(2, "a", 0.0),
    +			new Event(3, "a", 0.0),
    +			new Event(4, "a", 0.0),
    +			new Event(5, "a", 0.0),
    +			new Event(6, "a", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(AfterMatchSkipStrategy.SkipStrategy.SKIP_PAST_LAST_EVENT)).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(3);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,\n4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToFirst() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start",
    +			new AfterMatchSkipStrategy(
    +				AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_FIRST, "end"))
    +			.where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n3,4,5,6,";
    +
    +		env.execute();
    +	}
    +
    +	@Test
    +	public void testSkipToLast() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "ab", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "ab", 0.0),
    +			new Event(5, "ab", 0.0),
    +			new Event(6, "ab", 0.0),
    +			new Event(7, "ab", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "end")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).times(2).next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("b");
    +			}
    +		}).times(2);
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		// expected sequence of matching event ids
    +		expected = "1,2,3,4,\n4,5,6,7,";
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToLastWithEmptyException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "middle")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("d");
    +				}
    +			}
    +		).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    +	public void testSkipToLastWithInfiniteLoopException() throws Exception {
    +		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +		DataStream<Event> input = env.fromElements(
    +			new Event(1, "ab", 0.0),
    +			new Event(2, "c", 0.0),
    +			new Event(3, "ab", 0.0),
    +			new Event(4, "c", 0.0)
    +		);
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start", new AfterMatchSkipStrategy(
    +			AfterMatchSkipStrategy.SkipStrategy.SKIP_TO_LAST, "start")).where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("a");
    +			}
    +		}).next("middle").where(
    +			new SimpleCondition<Event>() {
    +
    +				@Override
    +				public boolean filter(Event value) throws Exception {
    +					return value.getName().contains("d");
    +				}
    +			}
    +		).oneOrMore().optional().next("end").where(new SimpleCondition<Event>() {
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().contains("c");
    +			}
    +		});
    +		DataStream<String> result = CEP.pattern(input, pattern).select(newIdSelectFunction("start", "end"));
    +
    +		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
    +
    +		env.execute();
    +	}
    +
    +	@Test(expected = MalformedPatternException.class)
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---