You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by dianfu <gi...@git.apache.org> on 2017/06/14 04:25:17 UTC

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

GitHub user dianfu opened a pull request:

    https://github.com/apache/flink/pull/4121

    [FLINK-6904] [cep] Support for quantifier range to CEP's pattern API

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [ ] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [ ] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dianfu/flink FLINK-6904

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/4121.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #4121
    
----
commit ff703bf8ba02b4221ca02440566fae7c450ce95b
Author: Dian Fu <fu...@alibaba-inc.com>
Date:   2017-06-14T03:19:41Z

    [FLINK-6904] [cep] Support for quantifier range to CEP's pattern API

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121953691
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---
    @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception {
     		));
     	}
     
    +	@Test
    +	public void testTimesRangeNonStrictOptional1() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(1, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictOptional3() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictWithNext() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).next("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNotStrictWithFollowedByEager() {
    --- End diff --
    
    Agree, addressed in the new patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r122633648
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---
    @@ -318,7 +319,30 @@ public Quantifier getQuantifier() {
     		checkIfQuantifierApplied();
     		Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
     		this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
    -		this.times = times;
    +		this.times = Times.of(times);
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies that the pattern can occur between from and to times.
    +	 *
    +	 * @param from number of times matching event must appear at least
    +	 * @param to number of times matching event must appear at most
    +	 * @return The same pattern with the number of times range applied
    +	 *
    +	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
    +	 */
    +	public Pattern<T, F> times(int from, int to) {
    +		checkIfNoNotPattern();
    +		checkIfQuantifierApplied();
    +		Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
    --- End diff --
    
    It is already checked in `Times` class. Maybe remove those checks from here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4121: [FLINK-6904] [cep] Support for quantifier range to CEP's ...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4121
  
    Hi @dianfu, sorry for late reply. I was on short vacations. The updated PR looks good! I agree though with @kl0u that changes from #4132 should be covered by this PR.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4121: [FLINK-6904] [cep] Support for quantifier range to CEP's ...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4121
  
    Hi @dawidwys thanks a lot for the review, what's your thought for the updated patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r122634439
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java ---
    @@ -318,7 +319,30 @@ public Quantifier getQuantifier() {
     		checkIfQuantifierApplied();
     		Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
     		this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
    -		this.times = times;
    +		this.times = Times.of(times);
    +		return this;
    +	}
    +
    +	/**
    +	 * Specifies that the pattern can occur between from and to times.
    +	 *
    +	 * @param from number of times matching event must appear at least
    +	 * @param to number of times matching event must appear at most
    +	 * @return The same pattern with the number of times range applied
    +	 *
    +	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
    +	 */
    +	public Pattern<T, F> times(int from, int to) {
    +		checkIfNoNotPattern();
    +		checkIfQuantifierApplied();
    +		Preconditions.checkArgument(from >= 0, "The from should be a non-negative number greater than or equal to 0.");
    --- End diff --
    
    Make sense, removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121953266
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---
    @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception {
     		));
     	}
     
    +	@Test
    +	public void testTimesRangeNonStrictOptional1() {
    --- End diff --
    
    The next two tests are slightly different from this one:
    testTimesRangeNotStrictOption1: followedBy().times().optional().followedBy
    testTimesRangeNotStrictOption2: followedByAny().times().allowCombinations().optional().followedBy()
    testTimesRangeNotStrictOption3: followedByAny().times().optional().followedBy()


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121943356
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---
    @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception {
     		));
     	}
     
    +	@Test
    +	public void testTimesRangeNonStrictOptional1() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(1, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictOptional3() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictWithNext() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).next("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNotStrictWithFollowedByEager() {
    --- End diff --
    
    Maybe change the name to `testTimesRangeNotStrictWithFollowedBy`. I think eager may be misleading here. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/4121


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4121: [FLINK-6904] [cep] Support for quantifier range to CEP's ...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on the issue:

    https://github.com/apache/flink/pull/4121
  
    @dawidwys make sense, have updated the patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121952296
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---
    @@ -1391,6 +1391,248 @@ public boolean filter(Event value) throws Exception {
     	}
    --- End diff --
    
    Agree. Moved the newly added tests to file TestsRangeITCase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dianfu <gi...@git.apache.org>.
Github user dianfu commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121953747
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---
    @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception {
     		));
     	}
     
    +	@Test
    +	public void testTimesRangeNonStrictOptional1() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(1, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictOptional3() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictWithNext() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).next("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNotStrictWithFollowedByEager() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNotStrictWithFollowedByNotEager() {
    --- End diff --
    
    Agree, addressed in the new patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121942275
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---
    @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception {
     		));
     	}
     
    +	@Test
    +	public void testTimesRangeNonStrictOptional1() {
    --- End diff --
    
    I think it is covered by the next two tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121939394
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---
    @@ -1391,6 +1391,248 @@ public boolean filter(Event value) throws Exception {
     	}
    --- End diff --
    
    Maybe let's move all those new tests into seperate class like `SameElementITCase` or `IterativeConditionsITCase`? The NFAITCase is already too long in my opinion.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #4121: [FLINK-6904] [cep] Support for quantifier range to...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4121#discussion_r121943417
  
    --- Diff: flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java ---
    @@ -1905,6 +2147,275 @@ public boolean filter(Event value) throws Exception {
     		));
     	}
     
    +	@Test
    +	public void testTimesRangeNonStrictOptional1() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(1, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictOptional2() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).allowCombinations().optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictOptional3() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 3));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedByAny("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).optional().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNonStrictWithNext() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 3));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
    +		inputEvents.add(new StreamRecord<>(new Event(23, "f", 1.0), 5));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).next("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).allowCombinations().followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent3, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNotStrictWithFollowedByEager() {
    +		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
    +
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent1, 2));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent2, 4));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.middleEvent3, 6));
    +		inputEvents.add(new StreamRecord<>(ConsecutiveData.end, 7));
    +
    +		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("c");
    +			}
    +		}).followedBy("middle").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("a");
    +			}
    +		}).times(2, 3).followedBy("end1").where(new SimpleCondition<Event>() {
    +			private static final long serialVersionUID = 5726188262756267490L;
    +
    +			@Override
    +			public boolean filter(Event value) throws Exception {
    +				return value.getName().equals("b");
    +			}
    +		});
    +
    +		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
    +
    +		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
    +
    +		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.middleEvent3, ConsecutiveData.end),
    +			Lists.newArrayList(ConsecutiveData.startEvent, ConsecutiveData.middleEvent1, ConsecutiveData.middleEvent2, ConsecutiveData.end)
    +		));
    +	}
    +
    +	@Test
    +	public void testTimesRangeNotStrictWithFollowedByNotEager() {
    --- End diff --
    
    Maybe change the name to `testTimesRangeNotStrictWithFollowedByAny`. I think eager may be misleading here. What do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #4121: [FLINK-6904] [cep] Support for quantifier range to CEP's ...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/4121
  
    Thanks a lot @dianfu for the work here. I had the last comment and will merge it as soon as travis gives green.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---