You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dianfu <gi...@git.apache.org> on 2017/06/21 08:44:54 UTC

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

GitHub user dianfu opened a pull request:

    https://github.com/apache/flink/pull/4153

    [FLINK-6927] [cep] Support pattern group in CEP

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dianfu/flink FLINK-6927

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4153.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4153
    
----
commit 6b295b82ddcda542ef0832bdf8405c4bad975882
Author: Dian Fu <fu...@alibaba-inc.com>
Date:   2017-06-21T08:41:21Z

    [FLINK-6927] [cep] Support pattern group in CEP

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125012573
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    --- End diff --
    
    I missed a test or two for `... followedByAny (...).oneOrMore ....`
    also a test or two with `notNext` and `notFollow` after group patterns would be helpful I think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125173828
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State<T> loopingState) {
     		}
     
     		/**
    +		 * Create all the states for the group pattern.
    +		 *
    +		 * @param groupPattern the group pattern to create the states for
    +		 * @param sinkState the state that the group pattern being converted should point to
    +		 * @param proceedState the state that the group pattern being converted should proceed to
    +		 * @param isOptional whether the group pattern being converted is optional
    +		 * @return the first state of the states of the group pattern
    +		 */
    +		private State<T> createGroupPatternState(
    +			final GroupPattern<T, ?> groupPattern,
    +			final State<T> sinkState,
    +			final State<T> proceedState,
    +			final boolean isOptional) {
    +			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
    +
    +			Pattern<T, ?> oldCurrentPattern = currentPattern;
    +			Pattern<T, ?> oldFollowingPattern = followingPattern;
    +			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
    +			try {
    --- End diff --
    
    updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125178823
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    --- End diff --
    
    updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125005837
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---
    @@ -153,9 +153,8 @@ public int hashCode() {
     		private final int to;
     
     		private Times(int from, int to) {
    -			Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
    +			Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0.");
    --- End diff --
    
    Move to another JIRA/PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4153


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125176853
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event b = new Event(43, "b", 3.0);
    +		Event d = new Event(44, "d", 4.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(b, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testFollowedByGroupTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesCombinations() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a1, b1, a3, b3, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNextZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 3.0);
    +		Event b2 = new Event(44, "b", 3.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 3.0);
    +		Event d = new Event(47, "d", 1.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1L));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 2L));
    +		inputEvents.add(new StreamRecord<>(a1, 3L));
    +		inputEvents.add(new StreamRecord<>(b1, 4L));
    +		inputEvents.add(new StreamRecord<>(a2, 5L));
    +		inputEvents.add(new StreamRecord<>(b2, 6L));
    +		inputEvents.add(new StreamRecord<>(a3, 7L));
    +		inputEvents.add(new StreamRecord<>(b3, 8L));
    +		inputEvents.add(new StreamRecord<>(d, 9L));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 6215754202506583964L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).next(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().consecutive().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 7056763917392056548L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNest() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event d = new Event(40, "d", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event c1 = new Event(43, "c", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event c2 = new Event(45, "c", 4.0);
    +		Event e = new Event(46, "e", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(d, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(c1, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(c2, 6));
    +		inputEvents.add(new StreamRecord<>(e, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		}).followedBy("middle3").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		})).oneOrMore().optional()).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(d, e),
    +			Lists.newArrayList(d, a1, e),
    +			Lists.newArrayList(d, a1, b1, c1, e),
    +			Lists.newArrayList(d, a1, b1, c1, b2, c2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNestTimes() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event d = new Event(40, "d", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event c1 = new Event(43, "c", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event c2 = new Event(45, "c", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event c3 = new Event(47, "c", 4.0);
    +		Event a2 = new Event(48, "a", 2.0);
    +		Event b4 = new Event(49, "b", 3.0);
    +		Event c4 = new Event(50, "c", 4.0);
    +		Event b5 = new Event(51, "b", 5.0);
    +		Event c5 = new Event(52, "c", 4.0);
    +		Event b6 = new Event(53, "b", 5.0);
    +		Event c6 = new Event(54, "c", 4.0);
    +		Event e = new Event(55, "e", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(d, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(c1, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(c2, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(c3, 8));
    +		inputEvents.add(new StreamRecord<>(a2, 9));
    +		inputEvents.add(new StreamRecord<>(b4, 10));
    +		inputEvents.add(new StreamRecord<>(c4, 11));
    +		inputEvents.add(new StreamRecord<>(b5, 12));
    +		inputEvents.add(new StreamRecord<>(c5, 13));
    +		inputEvents.add(new StreamRecord<>(b6, 14));
    +		inputEvents.add(new StreamRecord<>(c6, 15));
    +		inputEvents.add(new StreamRecord<>(e, 16));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		}).followedBy("middle3").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		})).times(3)).times(0, 2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(d, e),
    +			Lists.newArrayList(d, a1, b1, c1, b2, c2, b3, c3, e),
    +			Lists.newArrayList(d, a2, b4, c4, b5, c5, b6, c6, e),
    +			Lists.newArrayList(d, a1, b1, c1, b2, c2, b3, c3, a2, b4, c4, b5, c5, b6, c6, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNestTimesConsecutive() {
    --- End diff --
    
    make sense. updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r124999092
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State<T> loopingState) {
     		}
     
     		/**
    +		 * Create all the states for the group pattern.
    +		 *
    +		 * @param groupPattern the group pattern to create the states for
    +		 * @param sinkState the state that the group pattern being converted should point to
    +		 * @param proceedState the state that the group pattern being converted should proceed to
    +		 * @param isOptional whether the group pattern being converted is optional
    +		 * @return the first state of the states of the group pattern
    +		 */
    +		private State<T> createGroupPatternState(
    +			final GroupPattern<T, ?> groupPattern,
    +			final State<T> sinkState,
    +			final State<T> proceedState,
    +			final boolean isOptional) {
    +			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
    +
    +			Pattern<T, ?> oldCurrentPattern = currentPattern;
    +			Pattern<T, ?> oldFollowingPattern = followingPattern;
    +			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
    +			try {
    --- End diff --
    
    What is the point of this `try` block? Why not:
    
        private State<T> createGroupPatternState(
        	final GroupPattern<T, ?> groupPattern,
        	final State<T> sinkState,
        	final State<T> proceedState,
        	final boolean isOptional) {
        	final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
    
        	Pattern<T, ?> oldCurrentPattern = currentPattern;
        	Pattern<T, ?> oldFollowingPattern = followingPattern;
        	GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
        	State<T> lastSink = sinkState;
        	currentGroupPattern = groupPattern;
        	currentPattern = groupPattern.getRawPattern();
        	lastSink = createMiddleStates(lastSink);
        	lastSink = convertPattern(lastSink);
        	if (isOptional) {
        		// for the first state of a group pattern, its PROCEED edge should point to
        		// the following state of that group pattern
        		lastSink.addProceed(proceedState, trueFunction);
        	}
        	currentPattern = oldCurrentPattern;
        	followingPattern = oldFollowingPattern;
        	currentGroupPattern = oldGroupPattern;
        	return lastSink;
        }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125005277
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---
    @@ -430,6 +431,54 @@ public Quantifier getQuantifier() {
     		return this;
     	}
     
    +	/**
    +	 * Starts a new pattern sequence. The provided pattern is the initial pattern
    +	 * of the new sequence.
    +	 *
    +	 * @param group the pattern to begin with
    +	 * @return the first pattern of a pattern sequence
    +	 */
    +	public static <T, F extends T> GroupPattern<T, F> begin(Pattern<T, F> group) {
    +		return new GroupPattern<>(null, group);
    +	}
    +
    +	/**
    +	 * Appends a new pattern to the existing one. The new pattern enforces non-strict
    --- End diff --
    
    I would update the docs, to put more stress that this method is used for appending a group pattern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125173876
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java ---
    @@ -153,9 +153,8 @@ public int hashCode() {
     		private final int to;
     
     		private Times(int from, int to) {
    -			Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
    +			Preconditions.checkArgument(from > 0, "The from should be a positive number greater than 0.");
    --- End diff --
    
    done.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125010592
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event b = new Event(43, "b", 3.0);
    +		Event d = new Event(44, "d", 4.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(b, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testFollowedByGroupTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesCombinations() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a1, b1, a3, b3, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNextZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 3.0);
    +		Event b2 = new Event(44, "b", 3.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 3.0);
    +		Event d = new Event(47, "d", 1.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1L));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 2L));
    +		inputEvents.add(new StreamRecord<>(a1, 3L));
    +		inputEvents.add(new StreamRecord<>(b1, 4L));
    +		inputEvents.add(new StreamRecord<>(a2, 5L));
    +		inputEvents.add(new StreamRecord<>(b2, 6L));
    +		inputEvents.add(new StreamRecord<>(a3, 7L));
    +		inputEvents.add(new StreamRecord<>(b3, 8L));
    +		inputEvents.add(new StreamRecord<>(d, 9L));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 6215754202506583964L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).next(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().consecutive().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 7056763917392056548L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNest() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event d = new Event(40, "d", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event c1 = new Event(43, "c", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event c2 = new Event(45, "c", 4.0);
    +		Event e = new Event(46, "e", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(d, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(c1, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(c2, 6));
    +		inputEvents.add(new StreamRecord<>(e, 7));
    +
    --- End diff --
    
    Could you add as a comment  equivalents in pseudo pattern language? With the groups in work, it is extremely hard to read the patterns. Future readers would definitely benefit from it. 
    
    I have in mind sth like : `d (a (b c)*)? e`
    By default we would assume `followedBy` continuity, for others we would use `d any (a next(b c)*)? e` What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125173877
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java ---
    @@ -92,6 +92,57 @@ public boolean filter(Event value) throws Exception {
     	}
     
     	@Test
    +	public void testTimesRangeFromZero() {
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125009025
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    --- End diff --
    
    `testGroupFollowedBy` -> `testGroupFollowedByTimes`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4153
  
    @dawidwys Thanks a lot for the review. Updated the doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125173832
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State<T> loopingState) {
     		}
     
     		/**
    +		 * Create all the states for the group pattern.
    +		 *
    +		 * @param groupPattern the group pattern to create the states for
    +		 * @param sinkState the state that the group pattern being converted should point to
    +		 * @param proceedState the state that the group pattern being converted should proceed to
    +		 * @param isOptional whether the group pattern being converted is optional
    +		 * @return the first state of the states of the group pattern
    +		 */
    +		private State<T> createGroupPatternState(
    +			final GroupPattern<T, ?> groupPattern,
    +			final State<T> sinkState,
    +			final State<T> proceedState,
    +			final boolean isOptional) {
    +			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
    +
    +			Pattern<T, ?> oldCurrentPattern = currentPattern;
    +			Pattern<T, ?> oldFollowingPattern = followingPattern;
    +			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
    +			try {
    +				State<T> lastSink = sinkState;
    +				currentGroupPattern = groupPattern;
    +				currentPattern = groupPattern.getRawPattern();
    +				lastSink = createMiddleStates(lastSink);
    +				lastSink = convertPattern(lastSink);
    +				if (isOptional) {
    +					// for the first state of a group pattern, its PROCEED edge should point to
    +					// the following state of that group pattern
    +					lastSink.addProceed(proceedState, trueFunction);
    +				}
    +				return lastSink;
    +			} finally {
    +				currentPattern = oldCurrentPattern;
    +				followingPattern = oldFollowingPattern;
    +				currentGroupPattern = oldGroupPattern;
    +			}
    +		}
    +
    +		/**
    +		 * Create the states for the group pattern as a looping one.
    +		 *
    +		 * @param groupPattern the group pattern to create the states for
    +		 * @param sinkState the state that the group pattern being converted should point to
    +		 * @return the first state of the states of the group pattern
    +		 */
    +		private State<T> createLoopingGroupPatternState(
    +			final GroupPattern<T, ?> groupPattern,
    +			final State<T> sinkState) {
    +			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
    +
    +			Pattern<T, ?> oldCurrentPattern = currentPattern;
    +			Pattern<T, ?> oldFollowingPattern = followingPattern;
    +			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
    +			try {
    --- End diff --
    
    updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125012236
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event b = new Event(43, "b", 3.0);
    +		Event d = new Event(44, "d", 4.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(b, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testFollowedByGroupTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesCombinations() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a1, b1, a3, b3, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNextZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 3.0);
    +		Event b2 = new Event(44, "b", 3.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 3.0);
    +		Event d = new Event(47, "d", 1.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1L));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 2L));
    +		inputEvents.add(new StreamRecord<>(a1, 3L));
    +		inputEvents.add(new StreamRecord<>(b1, 4L));
    +		inputEvents.add(new StreamRecord<>(a2, 5L));
    +		inputEvents.add(new StreamRecord<>(b2, 6L));
    +		inputEvents.add(new StreamRecord<>(a3, 7L));
    +		inputEvents.add(new StreamRecord<>(b3, 8L));
    +		inputEvents.add(new StreamRecord<>(d, 9L));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 6215754202506583964L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).next(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().consecutive().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 7056763917392056548L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNest() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event d = new Event(40, "d", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event c1 = new Event(43, "c", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event c2 = new Event(45, "c", 4.0);
    +		Event e = new Event(46, "e", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(d, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(c1, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(c2, 6));
    +		inputEvents.add(new StreamRecord<>(e, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		}).followedBy("middle3").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		})).oneOrMore().optional()).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(d, e),
    +			Lists.newArrayList(d, a1, e),
    +			Lists.newArrayList(d, a1, b1, c1, e),
    +			Lists.newArrayList(d, a1, b1, c1, b2, c2, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNestTimes() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event d = new Event(40, "d", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event c1 = new Event(43, "c", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event c2 = new Event(45, "c", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event c3 = new Event(47, "c", 4.0);
    +		Event a2 = new Event(48, "a", 2.0);
    +		Event b4 = new Event(49, "b", 3.0);
    +		Event c4 = new Event(50, "c", 4.0);
    +		Event b5 = new Event(51, "b", 5.0);
    +		Event c5 = new Event(52, "c", 4.0);
    +		Event b6 = new Event(53, "b", 5.0);
    +		Event c6 = new Event(54, "c", 4.0);
    +		Event e = new Event(55, "e", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(d, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(c1, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(c2, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(c3, 8));
    +		inputEvents.add(new StreamRecord<>(a2, 9));
    +		inputEvents.add(new StreamRecord<>(b4, 10));
    +		inputEvents.add(new StreamRecord<>(c4, 11));
    +		inputEvents.add(new StreamRecord<>(b5, 12));
    +		inputEvents.add(new StreamRecord<>(c5, 13));
    +		inputEvents.add(new StreamRecord<>(b6, 14));
    +		inputEvents.add(new StreamRecord<>(c6, 15));
    +		inputEvents.add(new StreamRecord<>(e, 16));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		}).followedBy("middle3").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		})).times(3)).times(0, 2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("e");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(d, e),
    +			Lists.newArrayList(d, a1, b1, c1, b2, c2, b3, c3, e),
    +			Lists.newArrayList(d, a2, b4, c4, b5, c5, b6, c6, e),
    +			Lists.newArrayList(d, a1, b1, c1, b2, c2, b3, c3, a2, b4, c4, b5, c5, b6, c6, e)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNestTimesConsecutive() {
    --- End diff --
    
    I would test it differently. Generally I think we should rarely depend on tests that do not return results, or provide the equivalent positive test. (It is usually hard to tell if no results is due to expected behaviour or library not working at all)
    
    How about:
    
        @Test
        public void testGroupNestTimesConsecutive() {
        	List<StreamRecord<Event>> inputEvents = new ArrayList<>();
        
        	Event d = new Event(40, "d", 1.0);
        	Event a1 = new Event(41, "a", 2.0);
        	Event b1 = new Event(42, "b", 3.0);
        	Event c1 = new Event(43, "c", 4.0);
        	Event b2 = new Event(44, "b", 5.0);
        	Event c2 = new Event(45, "c", 4.0);
        	Event b3 = new Event(46, "b", 5.0);
        	Event c3 = new Event(47, "c", 4.0);
        	Event a2 = new Event(48, "a", 2.0);
        	Event b4 = new Event(49, "b", 3.0);
        	Event c4 = new Event(50, "c", 4.0);
        	Event b5 = new Event(51, "b", 5.0);
        	Event c5 = new Event(52, "c", 4.0);
        	Event b6 = new Event(53, "b", 5.0);
        	Event c6 = new Event(54, "c", 4.0);
        	Event e = new Event(55, "e", 6.0);
        
        	inputEvents.add(new StreamRecord<>(d, 1));
        	inputEvents.add(new StreamRecord<>(a1, 2));
        	inputEvents.add(new StreamRecord<>(b1, 3));
        	inputEvents.add(new StreamRecord<>(c1, 4));
        	inputEvents.add(new StreamRecord<>(b2, 5));
        	inputEvents.add(new StreamRecord<>(c2, 6));
        	inputEvents.add(new StreamRecord<>(b3, 7));
        	inputEvents.add(new StreamRecord<>(c3, 8));
        	inputEvents.add(new StreamRecord<>(a2, 9));
        	inputEvents.add(new StreamRecord<>(b4, 10));
        	inputEvents.add(new StreamRecord<>(c4, 11));
        	inputEvents.add(new StreamRecord<>(new Event(0, "breaking", 99.0), 12));
        	inputEvents.add(new StreamRecord<>(b5, 13));
        	inputEvents.add(new StreamRecord<>(c5, 14));
        	inputEvents.add(new StreamRecord<>(b6, 15));
        	inputEvents.add(new StreamRecord<>(c6, 16));
        	inputEvents.add(new StreamRecord<>(e, 17));
        
        	Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
        		private static final long serialVersionUID = 5726188262756267490L;
        
        		@Override
        		public boolean filter(Event value) throws Exception {
        			return value.getName().equals("d");
        		}
        	}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
        		private static final long serialVersionUID = 5726188262756267490L;
        
        		@Override
        		public boolean filter(Event value) throws Exception {
        			return value.getName().equals("a");
        		}
        	}).followedBy(Pattern.<Event>begin("middle2").where(new SimpleCondition<Event>() {
        		private static final long serialVersionUID = 5726188262756267490L;
        
        		@Override
        		public boolean filter(Event value) throws Exception {
        			return value.getName().equals("b");
        		}
        	}).followedBy("middle3").where(new SimpleCondition<Event>() {
        		private static final long serialVersionUID = 5726188262756267490L;
        
        		@Override
        		public boolean filter(Event value) throws Exception {
        			return value.getName().equals("c");
        		}
        	})).times(3).consecutive()).times(0, 2).consecutive().followedBy("end").where(new SimpleCondition<Event>() {
        		private static final long serialVersionUID = 5726188262756267490L;
        
        		@Override
        		public boolean filter(Event value) throws Exception {
        			return value.getName().equals("e");
        		}
        	});
        
        	NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
        
        	final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
        
        	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
        		Lists.newArrayList(d, e),
        		Lists.newArrayList(d, a1, b1, c1, b2, c2, b3, c3, e)
        	));
        }


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4153
  
    @dianfu sorry I have not reviewed it yet, but I do think this feature would benefit from reworking of the Pattern API I propose in [FLINK-3414](https://issues.apache.org/jira/browse/FLINK-3414).
    
    Instead of checking for head/tail of group pattern and caching them, we could more or less use current code for plain sequence creation that could return begin/end states. Then the code for joining groups would be much easier, cause it would operate on already translated sequences.
    
    Also the new API would make [FLINK-4641]](https://issues.apache.org/jira/browse/FLINK-4641)
    much easier I think.
    
    As it would require API rework I would really like to hear @kl0u opinion. If we agree though not to change the API I will go straight to reviewing this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125005892
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java ---
    @@ -92,6 +92,57 @@ public boolean filter(Event value) throws Exception {
     	}
     
     	@Test
    +	public void testTimesRangeFromZero() {
    --- End diff --
    
    Move to another JIRA/PR


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4153
  
    @dianfu Thanks for the update. The code looks really nice right now. The only thing that is missing for this PR, are the docs. Could you please add a section about the group patterns?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4153
  
    rebase the code and @dawidwys @kl0u could you help to take a look at this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125173760
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---
    @@ -366,8 +366,9 @@ public Quantifier getQuantifier() {
     		checkIfNoNotPattern();
     		checkIfQuantifierApplied();
     		this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
    -		if (from == 0) {
    --- End diff --
    
    Thanks for the suggestion, created PR: https://github.com/apache/flink/pull/4242


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/4153
  
    Hi @dianfu and @dawidwys . 
    
    I think that we should stick to the current API as:
        1) a new API will raise serious backwards compatibility concerns, 
        2) people have already started using the current API and 
        3) there are two parallel efforts going on (SQL/ CEP standalone) and we should not block one on the other.
    
    If we agree on this, then this PR is ready for review @dawidwys .


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125176383
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event b = new Event(43, "b", 3.0);
    +		Event d = new Event(44, "d", 4.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(b, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testFollowedByGroupTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125176384
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125009258
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event b = new Event(43, "b", 3.0);
    +		Event d = new Event(44, "d", 4.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(b, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testFollowedByGroupTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    --- End diff --
    
    remove elements you don't use, otherwise it is misleading.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125173912
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---
    @@ -430,6 +431,54 @@ public Quantifier getQuantifier() {
     		return this;
     	}
     
    +	/**
    +	 * Starts a new pattern sequence. The provided pattern is the initial pattern
    +	 * of the new sequence.
    +	 *
    +	 * @param group the pattern to begin with
    +	 * @return the first pattern of a pattern sequence
    +	 */
    +	public static <T, F extends T> GroupPattern<T, F> begin(Pattern<T, F> group) {
    +		return new GroupPattern<>(null, group);
    +	}
    +
    +	/**
    +	 * Appends a new pattern to the existing one. The new pattern enforces non-strict
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125173871
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java ---
    @@ -55,6 +55,11 @@ public void checkNameUniqueness(String name) {
     		if (usedNames.contains(name)) {
     			throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique.");
     		}
    +		usedNames.add(name);
    +	}
    +
    --- End diff --
    
    updated


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4153
  
    @dawidwys @kl0u It will be great if you could take a look at this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125005984
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -455,6 +548,76 @@ private void addStopStateToLooping(final State<T> loopingState) {
     		}
     
     		/**
    +		 * Create all the states for the group pattern.
    +		 *
    +		 * @param groupPattern the group pattern to create the states for
    +		 * @param sinkState the state that the group pattern being converted should point to
    +		 * @param proceedState the state that the group pattern being converted should proceed to
    +		 * @param isOptional whether the group pattern being converted is optional
    +		 * @return the first state of the states of the group pattern
    +		 */
    +		private State<T> createGroupPatternState(
    +			final GroupPattern<T, ?> groupPattern,
    +			final State<T> sinkState,
    +			final State<T> proceedState,
    +			final boolean isOptional) {
    +			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
    +
    +			Pattern<T, ?> oldCurrentPattern = currentPattern;
    +			Pattern<T, ?> oldFollowingPattern = followingPattern;
    +			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
    +			try {
    +				State<T> lastSink = sinkState;
    +				currentGroupPattern = groupPattern;
    +				currentPattern = groupPattern.getRawPattern();
    +				lastSink = createMiddleStates(lastSink);
    +				lastSink = convertPattern(lastSink);
    +				if (isOptional) {
    +					// for the first state of a group pattern, its PROCEED edge should point to
    +					// the following state of that group pattern
    +					lastSink.addProceed(proceedState, trueFunction);
    +				}
    +				return lastSink;
    +			} finally {
    +				currentPattern = oldCurrentPattern;
    +				followingPattern = oldFollowingPattern;
    +				currentGroupPattern = oldGroupPattern;
    +			}
    +		}
    +
    +		/**
    +		 * Create the states for the group pattern as a looping one.
    +		 *
    +		 * @param groupPattern the group pattern to create the states for
    +		 * @param sinkState the state that the group pattern being converted should point to
    +		 * @return the first state of the states of the group pattern
    +		 */
    +		private State<T> createLoopingGroupPatternState(
    +			final GroupPattern<T, ?> groupPattern,
    +			final State<T> sinkState) {
    +			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
    +
    +			Pattern<T, ?> oldCurrentPattern = currentPattern;
    +			Pattern<T, ?> oldFollowingPattern = followingPattern;
    +			GroupPattern<T, ?> oldGroupPattern = currentGroupPattern;
    +			try {
    --- End diff --
    
    Same as above. Whyt `try`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r124994181
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java ---
    @@ -55,6 +55,11 @@ public void checkNameUniqueness(String name) {
     		if (usedNames.contains(name)) {
     			throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique.");
     		}
    +		usedNames.add(name);
    +	}
    +
    --- End diff --
    
    javadoc missing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125009695
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event b = new Event(43, "b", 3.0);
    +		Event d = new Event(44, "d", 4.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(b, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testFollowedByGroupTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesCombinations() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a1, b1, a3, b3, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNextZeroOrMore() {
    --- End diff --
    
    a positive version would be useful I think. By positive I mean without the breaking element.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125176457
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event b = new Event(43, "b", 3.0);
    +		Event d = new Event(44, "d", 4.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(b, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testFollowedByGroupTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesCombinations() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a1, b1, a3, b3, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNextZeroOrMore() {
    --- End diff --
    
    done


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125176801
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java ---
    @@ -0,0 +1,807 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.cep.nfa;
    +
    +import org.apache.flink.cep.Event;
    +import org.apache.flink.cep.nfa.compiler.NFACompiler;
    +import org.apache.flink.cep.pattern.GroupPattern;
    +import org.apache.flink.cep.pattern.Pattern;
    +import org.apache.flink.cep.pattern.conditions.SimpleCondition;
    +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
    +import org.apache.flink.util.TestLogger;
    +
    +import com.google.common.collect.Lists;
    +import org.junit.Test;
    +
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
    +import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
    +
    +/**
    + * IT tests covering {@link GroupPattern}.
    + */
    +@SuppressWarnings("unchecked")
    +public class GroupITCase extends TestLogger {
    +
    +	@Test
    +	public void testGroupFollowedBy() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event a2 = new Event(42, "a", 2.0);
    +		Event b = new Event(43, "b", 3.0);
    +		Event d = new Event(44, "d", 4.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(a2, 3));
    +		inputEvents.add(new StreamRecord<>(b, 4));
    +		inputEvents.add(new StreamRecord<>(d, 5));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testFollowedByGroupTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByOneOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event d = new Event(45, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(d, 6));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesCombinations() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a1, b1, a3, b3, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupFollowedByAnyTimesOptional() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 5.0);
    +		Event d = new Event(47, "d", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(a2, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(a3, 6));
    +		inputEvents.add(new StreamRecord<>(b3, 7));
    +		inputEvents.add(new StreamRecord<>(d, 8));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d),
    +			Lists.newArrayList(c, a1, b1, a2, b2, d),
    +			Lists.newArrayList(c, a2, b2, a3, b3, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNextZeroOrMore() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event c = new Event(40, "c", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event a2 = new Event(43, "a", 3.0);
    +		Event b2 = new Event(44, "b", 3.0);
    +		Event a3 = new Event(45, "a", 4.0);
    +		Event b3 = new Event(46, "b", 3.0);
    +		Event d = new Event(47, "d", 1.0);
    +
    +		inputEvents.add(new StreamRecord<>(c, 1L));
    +		inputEvents.add(new StreamRecord<>(new Event(1, "event", 1.0), 2L));
    +		inputEvents.add(new StreamRecord<>(a1, 3L));
    +		inputEvents.add(new StreamRecord<>(b1, 4L));
    +		inputEvents.add(new StreamRecord<>(a2, 5L));
    +		inputEvents.add(new StreamRecord<>(b2, 6L));
    +		inputEvents.add(new StreamRecord<>(a3, 7L));
    +		inputEvents.add(new StreamRecord<>(b3, 8L));
    +		inputEvents.add(new StreamRecord<>(d, 9L));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 6215754202506583964L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).next(Pattern.<Event>begin("middle1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).followedBy("middle2").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		})).oneOrMore().optional().consecutive().followedBy("end").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 7056763917392056548L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("d");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(c, d)
    +		));
    +	}
    +
    +	@Test
    +	public void testGroupNest() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		Event d = new Event(40, "d", 1.0);
    +		Event a1 = new Event(41, "a", 2.0);
    +		Event b1 = new Event(42, "b", 3.0);
    +		Event c1 = new Event(43, "c", 4.0);
    +		Event b2 = new Event(44, "b", 5.0);
    +		Event c2 = new Event(45, "c", 4.0);
    +		Event e = new Event(46, "e", 6.0);
    +
    +		inputEvents.add(new StreamRecord<>(d, 1));
    +		inputEvents.add(new StreamRecord<>(a1, 2));
    +		inputEvents.add(new StreamRecord<>(b1, 3));
    +		inputEvents.add(new StreamRecord<>(c1, 4));
    +		inputEvents.add(new StreamRecord<>(b2, 5));
    +		inputEvents.add(new StreamRecord<>(c2, 6));
    +		inputEvents.add(new StreamRecord<>(e, 7));
    +
    --- End diff --
    
    make much sense. Updated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4153#discussion_r125005796
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---
    @@ -366,8 +366,9 @@ public Quantifier getQuantifier() {
     		checkIfNoNotPattern();
     		checkIfQuantifierApplied();
     		this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
    -		if (from == 0) {
    --- End diff --
    
    Could you submit the change to TimesRange with another JIRA/PR? Let's make one change at a time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4153: [FLINK-6927] [cep] Support pattern group in CEP

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4153
  
    @dawidwys thanks a lot for your comments. Have updated the PR and it should have addressed all the comments. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---