You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by mushketyk <gi...@git.apache.org> on 2016/08/11 21:12:22 UTC

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

GitHub user mushketyk opened a pull request:

    https://github.com/apache/flink/pull/2361

    [FLINK-3318][cep] Add support for quantifiers to CEP's pattern API

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mushketyk/flink cep-operators

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2361.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2361
    
----
commit 96c077a4c1c2c1ccba678ec863775402554d6dcf
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-08-05T20:05:39Z

    [FLINK-3318][cep] Add support for quantifiers to CEP's pattern API

commit 425fa3d94d15ea6b1396e7bf7a901f7318f107b0
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-08-11T21:10:43Z

    [FLINK-3318][cep] Add documentation about pattern quantifiers

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by tttMelody <gi...@git.apache.org>.
Github user tttMelody commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    it's an  important feature,esper and siddhi all support this.hope to relase as soon as possible 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    @tillrohrmann , could you please take a look at this PR?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Closed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97790385
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -130,26 +114,166 @@
     			}
     
     			// add the beginning state
    -			final State<T> beginningState;
    +			State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
    +			addTransitions(beginningState, -1, patterns, states);
    +			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +		}
    +	}
     
    -			if (states.containsKey(BEGINNING_STATE_NAME)) {
    -				beginningState = states.get(BEGINNING_STATE_NAME);
    -			} else {
    -				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -				states.put(BEGINNING_STATE_NAME, beginningState);
    -			}
    +	private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
     
    -			beginningState.addStateTransition(new StateTransition<T>(
    +		if (shouldRepeatPattern(patternPos, patterns)) {
    +			expandRepeatingPattern(currentState, patternPos, patterns, states);
    +		} else {
    +			currentState.addStateTransition(new StateTransition<T>(
     				StateTransitionAction.TAKE,
    -				currentState,
    -				(FilterFunction<T>) currentPattern.getFilterFunction()
    +				succeedingState,
    +				(FilterFunction<T>) succeedingPattern.getFilterFunction()
     			));
     
    -			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +			if (shouldAddSelfTransition(succeedingPattern))  {
    +				addTransitionToSelf(succeedingPattern, succeedingState);
    +			}
    +			if (isPatternOptional(succeedingPattern)) {
    +				addOptionalTransitions(currentState, patternPos, patterns, states);
    +			}
    +		}
    +	}
    +
    +	private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1);
    +
    +		for (int optionalPatternPos = patternPos + 2;
    +				optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size());
    +				optionalPatternPos++) {
    +
    +			Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos);
    +			State<T> optionalState = states.get(optionalPattern.getName());
    +			currentState.addStateTransition(new StateTransition<>(
    +				StateTransitionAction.TAKE,
    +				optionalState,
    +				(FilterFunction<T>) optionalPattern.getFilterFunction()));
     		}
     	}
     
     	/**
    +	 * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
    +	 *
    +	 * +-----+  +-------+  +-------+
    +	 * |State+->|State#1+->|State#2+
    +	 * +--+--+  +-------+  +--+----+
    +	 */
    +	private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
    +													ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
    +		Pattern<T, ?> currentPattern = patterns.get(patternPos);
    +
    +		State<T> currentRepeatingState = null;
    +		State<T> nextRepeatingState = currentState;
    +		for (int i = 1; i < currentPattern.getMaxCount(); i++) {
    +			currentRepeatingState = nextRepeatingState;
    +			nextRepeatingState = new State<>(
    +				currentState.getName() + "#" + i,
    +				State.StateType.Normal);
    +			states.put(nextRepeatingState.getName(), nextRepeatingState);
    +			currentRepeatingState.addStateTransition(new StateTransition<T>(
    +				StateTransitionAction.TAKE,
    +				nextRepeatingState,
    +				(FilterFunction<T>) currentPattern.getFilterFunction()));
    +
    +			// Add a transition around optional pattern.
    +			// count(2,3) will result in:
    +			// +-----+  +-------+   +-------+  +----+
    +			// |State+->|State#1+-->|State#2+->|Next|
    +			// +--+--+  +-------+   +--+----+  +-+--+
    +			//              |                    ^
    +			//              +--------------------+
    +			if (i >= currentPattern.getMinCount()) {
    +				currentRepeatingState.addStateTransition(new StateTransition<T>(
    +					StateTransitionAction.TAKE,
    +					succeedingState,
    +					(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +			}
    +		}
    +		nextRepeatingState.addStateTransition(new StateTransition<T>(
    +			StateTransitionAction.TAKE,
    +			succeedingState,
    +			(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +	}
    +
    +	private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) {
    +		if (patternPos == -1) {
    +			return false;
    +		}
    +
    +		Pattern<T, ?> pattern = patterns.get(patternPos);
    +		return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
    +	}
    +
    +	private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) {
    +		succeedingState.addStateTransition(new StateTransition<T>(
    +			StateTransitionAction.TAKE,
    +			succeedingState,
    +			(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +	}
    +
    +	private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) {
    +		return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY
    +			|| succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY;
    +	}
    +
    +	private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) {
    +		int pos = startPos;
    +		for (; pos < patterns.size(); pos++) {
    +			Pattern<T, ?> pattern = patterns.get(pos);
    +			if (!isPatternOptional(pattern)) {
    +				return pos;
    +			}
    +		}
    +
    +		return pos;
    +	}
    +
    +	private static <T> Map<String, State<T>> createStatesFrom(ArrayList<Pattern<T, ?>> patterns) {
    +		Map<String, State<T>> states = new HashMap<>();
    +
    +		boolean foundNonOptionalPattern = false;
    +		for (int i = patterns.size() - 1; i >= 0; i--) {
    +			Pattern<T, ?> pattern = patterns.get(i);
    +			State.StateType stateType = foundNonOptionalPattern ? State.StateType.Normal
    +																: State.StateType.Final;
    +			State<T> newState = new State<>(pattern.getName(), stateType);
    +			foundNonOptionalPattern |= !isPatternOptional(pattern);
    +			states.put(newState.getName(), newState);
    +		}
    +
    +		State<T> beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    +		states.put(BEGINNING_STATE_NAME, beginningState);
    +		return states;
    +	}
    +
    +	private static <T> boolean isPatternOptional(Pattern<T, ?> pattern) {
    +		return pattern.getQuantifier() == Quantifier.ZERO_OR_MANY
    +			|| pattern.getQuantifier() == Quantifier.OPTIONAL
    +			|| pattern.getMinCount() == 0;
    +	}
    +
    +	private static <T> ArrayList<Pattern<T, ?>> createPatternsList(Pattern<T, ?> pattern) {
    --- End diff --
    
    The return type can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Well for iii), the idea behind the new type of state is that the `NFA` will see one `PotentiallyFinal` element, but it will continue receiving, until the first non-eligible element. So if the pattern says sth like `'a'.oneOrMany`, the `a` will be `PotentiallyFinal` with a self-loop. So the `NFA` will have to set a flag, e.g. `canTerminate` to `true`. Then another `a`, still valid, so the `NFA` will accept it, and then a `b`. In this case, the `NFA` should declare the previous as a matching pattern, emit it, and then continue with the `b`. The current `NFA` would not recognize the `PotentiallyFinal` state. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @kl0u 
    
    Sorry for the long delay. I don't have any free time to allocate to contribute to Flink, so I don't mind if @dawidwys works on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @mushketyk are you still working on this issue? 
    In not, @dawidwys would also like to work on this issue.
    
    Please let me know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk closed the pull request at:

    https://github.com/apache/flink/pull/2361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @mushketyk ! Thanks a lot for the reply! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97789351
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---
    @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType) {
     		this.name = name;
     		this.stateType = stateType;
     
    -		stateTransitions = new ArrayList<StateTransition<T>>();
    +		stateTransitions = new ArrayList<>();
    +	}
    +
    +	public State(String name, StateType stateType, Collection<StateTransition<T>> stateTransitions) {
    --- End diff --
    
    I agree with @chermenin and this class can be reverted to its previous state. In general, PRs should have the smallest diff possible in order to be easier to review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97789900
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -130,26 +114,166 @@
     			}
     
     			// add the beginning state
    -			final State<T> beginningState;
    +			State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
    +			addTransitions(beginningState, -1, patterns, states);
    +			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +		}
    +	}
     
    -			if (states.containsKey(BEGINNING_STATE_NAME)) {
    -				beginningState = states.get(BEGINNING_STATE_NAME);
    -			} else {
    -				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -				states.put(BEGINNING_STATE_NAME, beginningState);
    -			}
    +	private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    --- End diff --
    
    The `patterns` can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97789775
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -80,25 +82,17 @@
     			// return a factory for empty NFAs
     			return new NFAFactoryImpl<T>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
     		} else {
    +			ArrayList<Pattern<T, ?>> patterns = createPatternsList(pattern);
    --- End diff --
    
    The `patterns` can become a `List` instead of `ArrayList`. It is good to have the most generic type possible as argument or return type, as implementations may change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @mushketyk ! Yes. In this case we would expect to have everything apart from the "end" event in the result, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @kl0u,
    
    Yes you are right. It should work fine without the "end" event and if it does not work it is a bug. 
    I'll take a look at this in the next few days, rebase the PR and address your other comments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    I've added support for zeroToMany, oneToMany, optional and count quantifiers in CEP patterns.
    I had to change logic of NFACompiler quite a bit to accommodate new 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Could someone please take a look at this PR? It has been here without a review for more than 2 weeks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by chermenin <gi...@git.apache.org>.
Github user chermenin commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    @mushketyk It seems needed to rebase this PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Let's see if I understand correctly the test case you are describing. Do you mean that a pattern like:
    
    ```java
    begin().oneOrMany().where("a")
      .next().optional().where("b") 
    ```
    
    won't work using current `NFA`. But as a result of this change if we have a sequence "aaab", the NFA should emit two matching patterns: "aaa" and "aaab"?
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    I was talking about a pattern like `begin().oneOrMany().where("a")`. So that you just expect as many consecutive `a`'s as possible.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Got it Let me check how it works with the current code and I'll come back with a proposed solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Sounds good! Also you can think on the API changes I propose. I think they simplify the user-facing commands and remove some long and not so elegant if-loops that check the type of quantifier.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk closed the pull request at:

    https://github.com/apache/flink/pull/2361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @kl0u,
    
    Sorry for misunderstanding, I still don't understand the problem :)
    Let's select a test and go through it. For example in the test `testOneOrMoreCEPPattern` from `CEPComplexPatternsITCase ` there is a pattern that has the `oneOrMany` and the `end` statements:
    
    ```java
    .next("middle").oneOrMany().where(
     				new FilterFunction<Event>() {
     
     					@Override
     					public boolean filter(Event value) throws Exception {
     						return value.getName().equals("middle");
     					}
     				}
     			)
     			.next("end").where(new FilterFunction<Event>() {
     
     				@Override
     				public boolean filter(Event value) throws Exception {
     					return value.getName().equals("end");
     				}
     			});
    ```
    
    Do you say that if I remove this part: 
    
    ```java
     			.next("end").where(new FilterFunction<Event>() {
     
     				@Override
     				public boolean filter(Event value) throws Exception {
     					return value.getName().equals("end");
     				}
     			});
    ```
    
    the pattern won't work as expected?
    Is this your point?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97790105
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -130,26 +114,166 @@
     			}
     
     			// add the beginning state
    -			final State<T> beginningState;
    +			State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
    +			addTransitions(beginningState, -1, patterns, states);
    +			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +		}
    +	}
     
    -			if (states.containsKey(BEGINNING_STATE_NAME)) {
    -				beginningState = states.get(BEGINNING_STATE_NAME);
    -			} else {
    -				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -				states.put(BEGINNING_STATE_NAME, beginningState);
    -			}
    +	private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
     
    -			beginningState.addStateTransition(new StateTransition<T>(
    +		if (shouldRepeatPattern(patternPos, patterns)) {
    +			expandRepeatingPattern(currentState, patternPos, patterns, states);
    +		} else {
    +			currentState.addStateTransition(new StateTransition<T>(
     				StateTransitionAction.TAKE,
    -				currentState,
    -				(FilterFunction<T>) currentPattern.getFilterFunction()
    +				succeedingState,
    +				(FilterFunction<T>) succeedingPattern.getFilterFunction()
     			));
     
    -			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +			if (shouldAddSelfTransition(succeedingPattern))  {
    +				addTransitionToSelf(succeedingPattern, succeedingState);
    +			}
    +			if (isPatternOptional(succeedingPattern)) {
    +				addOptionalTransitions(currentState, patternPos, patterns, states);
    +			}
    +		}
    +	}
    +
    +	private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1);
    +
    +		for (int optionalPatternPos = patternPos + 2;
    +				optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size());
    +				optionalPatternPos++) {
    +
    +			Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos);
    +			State<T> optionalState = states.get(optionalPattern.getName());
    +			currentState.addStateTransition(new StateTransition<>(
    +				StateTransitionAction.TAKE,
    +				optionalState,
    +				(FilterFunction<T>) optionalPattern.getFilterFunction()));
     		}
     	}
     
     	/**
    +	 * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
    +	 *
    +	 * +-----+  +-------+  +-------+
    +	 * |State+->|State#1+->|State#2+
    +	 * +--+--+  +-------+  +--+----+
    +	 */
    +	private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
    +													ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
    +		Pattern<T, ?> currentPattern = patterns.get(patternPos);
    +
    +		State<T> currentRepeatingState = null;
    +		State<T> nextRepeatingState = currentState;
    +		for (int i = 1; i < currentPattern.getMaxCount(); i++) {
    +			currentRepeatingState = nextRepeatingState;
    +			nextRepeatingState = new State<>(
    +				currentState.getName() + "#" + i,
    +				State.StateType.Normal);
    +			states.put(nextRepeatingState.getName(), nextRepeatingState);
    +			currentRepeatingState.addStateTransition(new StateTransition<T>(
    +				StateTransitionAction.TAKE,
    +				nextRepeatingState,
    +				(FilterFunction<T>) currentPattern.getFilterFunction()));
    +
    +			// Add a transition around optional pattern.
    +			// count(2,3) will result in:
    +			// +-----+  +-------+   +-------+  +----+
    +			// |State+->|State#1+-->|State#2+->|Next|
    +			// +--+--+  +-------+   +--+----+  +-+--+
    +			//              |                    ^
    +			//              +--------------------+
    +			if (i >= currentPattern.getMinCount()) {
    +				currentRepeatingState.addStateTransition(new StateTransition<T>(
    +					StateTransitionAction.TAKE,
    +					succeedingState,
    +					(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +			}
    +		}
    +		nextRepeatingState.addStateTransition(new StateTransition<T>(
    +			StateTransitionAction.TAKE,
    +			succeedingState,
    +			(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +	}
    +
    +	private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) {
    +		if (patternPos == -1) {
    +			return false;
    +		}
    +
    +		Pattern<T, ?> pattern = patterns.get(patternPos);
    +		return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
    +	}
    +
    +	private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) {
    +		succeedingState.addStateTransition(new StateTransition<T>(
    +			StateTransitionAction.TAKE,
    +			succeedingState,
    +			(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +	}
    +
    +	private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) {
    +		return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY
    +			|| succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY;
    +	}
    +
    +	private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) {
    --- End diff --
    
    The `patterns` can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by mushketyk <gi...@git.apache.org>.
Github user mushketyk commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @kl0u 
    
    Thank you for your suggestions. 
    
    I don't have a concrete solution yet, but I can try to fill the gaps in testing and see if I can come up with a better solution. 
    
    Speaking about testing. Can you think of any other test cases that I might need to cover?
    
    i) Seems like a good idea. I'll add more testing and see if it can help
    ii) Good point. I'll work on this.
    iii) I am not sure why NFA must be changed? I think we can build NFA using current state types by adding necessary transitions around optional states. What do you think?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97790271
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -130,26 +114,166 @@
     			}
     
     			// add the beginning state
    -			final State<T> beginningState;
    +			State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
    +			addTransitions(beginningState, -1, patterns, states);
    +			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +		}
    +	}
     
    -			if (states.containsKey(BEGINNING_STATE_NAME)) {
    -				beginningState = states.get(BEGINNING_STATE_NAME);
    -			} else {
    -				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -				states.put(BEGINNING_STATE_NAME, beginningState);
    -			}
    +	private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
     
    -			beginningState.addStateTransition(new StateTransition<T>(
    +		if (shouldRepeatPattern(patternPos, patterns)) {
    +			expandRepeatingPattern(currentState, patternPos, patterns, states);
    +		} else {
    +			currentState.addStateTransition(new StateTransition<T>(
     				StateTransitionAction.TAKE,
    -				currentState,
    -				(FilterFunction<T>) currentPattern.getFilterFunction()
    +				succeedingState,
    +				(FilterFunction<T>) succeedingPattern.getFilterFunction()
     			));
     
    -			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +			if (shouldAddSelfTransition(succeedingPattern))  {
    +				addTransitionToSelf(succeedingPattern, succeedingState);
    +			}
    +			if (isPatternOptional(succeedingPattern)) {
    +				addOptionalTransitions(currentState, patternPos, patterns, states);
    +			}
    +		}
    +	}
    +
    +	private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1);
    +
    +		for (int optionalPatternPos = patternPos + 2;
    +				optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size());
    +				optionalPatternPos++) {
    +
    +			Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos);
    +			State<T> optionalState = states.get(optionalPattern.getName());
    +			currentState.addStateTransition(new StateTransition<>(
    +				StateTransitionAction.TAKE,
    +				optionalState,
    +				(FilterFunction<T>) optionalPattern.getFilterFunction()));
     		}
     	}
     
     	/**
    +	 * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
    +	 *
    +	 * +-----+  +-------+  +-------+
    +	 * |State+->|State#1+->|State#2+
    +	 * +--+--+  +-------+  +--+----+
    +	 */
    +	private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
    +													ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
    +		Pattern<T, ?> currentPattern = patterns.get(patternPos);
    +
    +		State<T> currentRepeatingState = null;
    +		State<T> nextRepeatingState = currentState;
    +		for (int i = 1; i < currentPattern.getMaxCount(); i++) {
    +			currentRepeatingState = nextRepeatingState;
    +			nextRepeatingState = new State<>(
    +				currentState.getName() + "#" + i,
    +				State.StateType.Normal);
    +			states.put(nextRepeatingState.getName(), nextRepeatingState);
    +			currentRepeatingState.addStateTransition(new StateTransition<T>(
    +				StateTransitionAction.TAKE,
    +				nextRepeatingState,
    +				(FilterFunction<T>) currentPattern.getFilterFunction()));
    +
    +			// Add a transition around optional pattern.
    +			// count(2,3) will result in:
    +			// +-----+  +-------+   +-------+  +----+
    +			// |State+->|State#1+-->|State#2+->|Next|
    +			// +--+--+  +-------+   +--+----+  +-+--+
    +			//              |                    ^
    +			//              +--------------------+
    +			if (i >= currentPattern.getMinCount()) {
    +				currentRepeatingState.addStateTransition(new StateTransition<T>(
    +					StateTransitionAction.TAKE,
    +					succeedingState,
    +					(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +			}
    +		}
    +		nextRepeatingState.addStateTransition(new StateTransition<T>(
    +			StateTransitionAction.TAKE,
    +			succeedingState,
    +			(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +	}
    +
    +	private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) {
    +		if (patternPos == -1) {
    +			return false;
    +		}
    +
    +		Pattern<T, ?> pattern = patterns.get(patternPos);
    +		return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
    +	}
    +
    +	private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) {
    +		succeedingState.addStateTransition(new StateTransition<T>(
    +			StateTransitionAction.TAKE,
    +			succeedingState,
    +			(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +	}
    +
    +	private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) {
    +		return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY
    +			|| succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY;
    +	}
    +
    +	private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) {
    +		int pos = startPos;
    +		for (; pos < patterns.size(); pos++) {
    +			Pattern<T, ?> pattern = patterns.get(pos);
    +			if (!isPatternOptional(pattern)) {
    +				return pos;
    +			}
    +		}
    +
    +		return pos;
    +	}
    +
    +	private static <T> Map<String, State<T>> createStatesFrom(ArrayList<Pattern<T, ?>> patterns) {
    --- End diff --
    
    The `patterns` can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97790026
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -130,26 +114,166 @@
     			}
     
     			// add the beginning state
    -			final State<T> beginningState;
    +			State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
    +			addTransitions(beginningState, -1, patterns, states);
    +			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +		}
    +	}
     
    -			if (states.containsKey(BEGINNING_STATE_NAME)) {
    -				beginningState = states.get(BEGINNING_STATE_NAME);
    -			} else {
    -				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -				states.put(BEGINNING_STATE_NAME, beginningState);
    -			}
    +	private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
     
    -			beginningState.addStateTransition(new StateTransition<T>(
    +		if (shouldRepeatPattern(patternPos, patterns)) {
    +			expandRepeatingPattern(currentState, patternPos, patterns, states);
    +		} else {
    +			currentState.addStateTransition(new StateTransition<T>(
     				StateTransitionAction.TAKE,
    -				currentState,
    -				(FilterFunction<T>) currentPattern.getFilterFunction()
    +				succeedingState,
    +				(FilterFunction<T>) succeedingPattern.getFilterFunction()
     			));
     
    -			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +			if (shouldAddSelfTransition(succeedingPattern))  {
    +				addTransitionToSelf(succeedingPattern, succeedingState);
    +			}
    +			if (isPatternOptional(succeedingPattern)) {
    +				addOptionalTransitions(currentState, patternPos, patterns, states);
    +			}
    +		}
    +	}
    +
    +	private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1);
    +
    +		for (int optionalPatternPos = patternPos + 2;
    +				optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size());
    +				optionalPatternPos++) {
    +
    +			Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos);
    +			State<T> optionalState = states.get(optionalPattern.getName());
    +			currentState.addStateTransition(new StateTransition<>(
    +				StateTransitionAction.TAKE,
    +				optionalState,
    +				(FilterFunction<T>) optionalPattern.getFilterFunction()));
     		}
     	}
     
     	/**
    +	 * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
    +	 *
    +	 * +-----+  +-------+  +-------+
    +	 * |State+->|State#1+->|State#2+
    +	 * +--+--+  +-------+  +--+----+
    +	 */
    +	private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
    +													ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    --- End diff --
    
    The `patterns` can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97789988
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -130,26 +114,166 @@
     			}
     
     			// add the beginning state
    -			final State<T> beginningState;
    +			State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
    +			addTransitions(beginningState, -1, patterns, states);
    +			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +		}
    +	}
     
    -			if (states.containsKey(BEGINNING_STATE_NAME)) {
    -				beginningState = states.get(BEGINNING_STATE_NAME);
    -			} else {
    -				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -				states.put(BEGINNING_STATE_NAME, beginningState);
    -			}
    +	private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
     
    -			beginningState.addStateTransition(new StateTransition<T>(
    +		if (shouldRepeatPattern(patternPos, patterns)) {
    +			expandRepeatingPattern(currentState, patternPos, patterns, states);
    +		} else {
    +			currentState.addStateTransition(new StateTransition<T>(
     				StateTransitionAction.TAKE,
    -				currentState,
    -				(FilterFunction<T>) currentPattern.getFilterFunction()
    +				succeedingState,
    +				(FilterFunction<T>) succeedingPattern.getFilterFunction()
     			));
     
    -			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +			if (shouldAddSelfTransition(succeedingPattern))  {
    +				addTransitionToSelf(succeedingPattern, succeedingState);
    +			}
    +			if (isPatternOptional(succeedingPattern)) {
    +				addOptionalTransitions(currentState, patternPos, patterns, states);
    +			}
    +		}
    +	}
    +
    +	private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    --- End diff --
    
    The `patterns` can become a `List` instead of `ArrayList`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @mushketyk could you close this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by mushketyk <gi...@git.apache.org>.
GitHub user mushketyk reopened a pull request:

    https://github.com/apache/flink/pull/2361

    [FLINK-3318][cep] Add support for quantifiers to CEP's pattern API

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [x] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/mushketyk/flink cep-operators

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/2361.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2361
    
----
commit 96c077a4c1c2c1ccba678ec863775402554d6dcf
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-08-05T20:05:39Z

    [FLINK-3318][cep] Add support for quantifiers to CEP's pattern API

commit 425fa3d94d15ea6b1396e7bf7a901f7318f107b0
Author: Ivan Mushketyk <iv...@gmail.com>
Date:   2016-08-11T21:10:43Z

    [FLINK-3318][cep] Add documentation about pattern quantifiers

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by chermenin <gi...@git.apache.org>.
Github user chermenin commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97264252
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java ---
    @@ -43,7 +43,14 @@ public State(final String name, final StateType stateType) {
     		this.name = name;
     		this.stateType = stateType;
     
    -		stateTransitions = new ArrayList<StateTransition<T>>();
    +		stateTransitions = new ArrayList<>();
    +	}
    +
    +	public State(String name, StateType stateType, Collection<StateTransition<T>> stateTransitions) {
    --- End diff --
    
    It seems that this constructor is never used. What is that for?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #2361: [FLINK-3318][cep] Add support for quantifiers to C...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2361#discussion_r97790228
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ---
    @@ -130,26 +114,166 @@
     			}
     
     			// add the beginning state
    -			final State<T> beginningState;
    +			State<T> beginningState = states.get(BEGINNING_STATE_NAME);;
    +			addTransitions(beginningState, -1, patterns, states);
    +			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +		}
    +	}
     
    -			if (states.containsKey(BEGINNING_STATE_NAME)) {
    -				beginningState = states.get(BEGINNING_STATE_NAME);
    -			} else {
    -				beginningState = new State<>(BEGINNING_STATE_NAME, State.StateType.Start);
    -				states.put(BEGINNING_STATE_NAME, beginningState);
    -			}
    +	private static <T> void addTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
     
    -			beginningState.addStateTransition(new StateTransition<T>(
    +		if (shouldRepeatPattern(patternPos, patterns)) {
    +			expandRepeatingPattern(currentState, patternPos, patterns, states);
    +		} else {
    +			currentState.addStateTransition(new StateTransition<T>(
     				StateTransitionAction.TAKE,
    -				currentState,
    -				(FilterFunction<T>) currentPattern.getFilterFunction()
    +				succeedingState,
    +				(FilterFunction<T>) succeedingPattern.getFilterFunction()
     			));
     
    -			return new NFAFactoryImpl<T>(inputTypeSerializer, windowTime, new HashSet<>(states.values()), timeoutHandling);
    +			if (shouldAddSelfTransition(succeedingPattern))  {
    +				addTransitionToSelf(succeedingPattern, succeedingState);
    +			}
    +			if (isPatternOptional(succeedingPattern)) {
    +				addOptionalTransitions(currentState, patternPos, patterns, states);
    +			}
    +		}
    +	}
    +
    +	private static <T> void addOptionalTransitions(State<T> currentState, int patternPos, ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		int firstNonOptionalPattern = findFirstNonOptionalPattern(patterns, patternPos + 1);
    +
    +		for (int optionalPatternPos = patternPos + 2;
    +				optionalPatternPos < Math.min(firstNonOptionalPattern + 1, patterns.size());
    +				optionalPatternPos++) {
    +
    +			Pattern<T, ?> optionalPattern = patterns.get(optionalPatternPos);
    +			State<T> optionalState = states.get(optionalPattern.getName());
    +			currentState.addStateTransition(new StateTransition<>(
    +				StateTransitionAction.TAKE,
    +				optionalState,
    +				(FilterFunction<T>) optionalPattern.getFilterFunction()));
     		}
     	}
     
     	/**
    +	 * Expand a pattern number of times and connect expanded states. E.g. count(3) wil result in:
    +	 *
    +	 * +-----+  +-------+  +-------+
    +	 * |State+->|State#1+->|State#2+
    +	 * +--+--+  +-------+  +--+----+
    +	 */
    +	private static <T> void expandRepeatingPattern(State<T> currentState, int patternPos,
    +													ArrayList<Pattern<T, ?>> patterns, Map<String, State<T>> states) {
    +		Pattern<T, ?> succeedingPattern = patterns.get(patternPos + 1);
    +		State<T> succeedingState = states.get(succeedingPattern.getName());
    +		Pattern<T, ?> currentPattern = patterns.get(patternPos);
    +
    +		State<T> currentRepeatingState = null;
    +		State<T> nextRepeatingState = currentState;
    +		for (int i = 1; i < currentPattern.getMaxCount(); i++) {
    +			currentRepeatingState = nextRepeatingState;
    +			nextRepeatingState = new State<>(
    +				currentState.getName() + "#" + i,
    +				State.StateType.Normal);
    +			states.put(nextRepeatingState.getName(), nextRepeatingState);
    +			currentRepeatingState.addStateTransition(new StateTransition<T>(
    +				StateTransitionAction.TAKE,
    +				nextRepeatingState,
    +				(FilterFunction<T>) currentPattern.getFilterFunction()));
    +
    +			// Add a transition around optional pattern.
    +			// count(2,3) will result in:
    +			// +-----+  +-------+   +-------+  +----+
    +			// |State+->|State#1+-->|State#2+->|Next|
    +			// +--+--+  +-------+   +--+----+  +-+--+
    +			//              |                    ^
    +			//              +--------------------+
    +			if (i >= currentPattern.getMinCount()) {
    +				currentRepeatingState.addStateTransition(new StateTransition<T>(
    +					StateTransitionAction.TAKE,
    +					succeedingState,
    +					(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +			}
    +		}
    +		nextRepeatingState.addStateTransition(new StateTransition<T>(
    +			StateTransitionAction.TAKE,
    +			succeedingState,
    +			(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +	}
    +
    +	private static <T> boolean shouldRepeatPattern(int patternPos, ArrayList<Pattern<T, ?>> patterns) {
    +		if (patternPos == -1) {
    +			return false;
    +		}
    +
    +		Pattern<T, ?> pattern = patterns.get(patternPos);
    +		return pattern.getMinCount() != 1 || pattern.getMaxCount() != 1;
    +	}
    +
    +	private static <T> void addTransitionToSelf(Pattern<T, ?> succeedingPattern, State<T> succeedingState) {
    +		succeedingState.addStateTransition(new StateTransition<T>(
    +			StateTransitionAction.TAKE,
    +			succeedingState,
    +			(FilterFunction<T>) succeedingPattern.getFilterFunction()));
    +	}
    +
    +	private static <T> boolean shouldAddSelfTransition(Pattern<T, ?> succeedingPattern) {
    +		return succeedingPattern.getQuantifier() == Quantifier.ZERO_OR_MANY
    +			|| succeedingPattern.getQuantifier() == Quantifier.ONE_OR_MANY;
    +	}
    +
    +	private static <T> int findFirstNonOptionalPattern(ArrayList<Pattern<T, ?>> patterns, int startPos) {
    +		int pos = startPos;
    +		for (; pos < patterns.size(); pos++) {
    --- End diff --
    
    This can become a `while-loop`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #2361: [FLINK-3318][cep] Add support for quantifiers to CEP's pa...

Posted by kl0u <gi...@git.apache.org>.
Github user kl0u commented on the issue:

    https://github.com/apache/flink/pull/2361
  
    Hi @mushketyk ! Thanks a lot!
    
    The problem is in the `NFACompiler` and more specifically the way the pattern is compiled. I have not come up with a concrete solution yet but I would say that:
    
    i) there should be another `StateType` as well, sth like `PotentialFinal` (or a better name) and in the case that the final pattern is optional, then it gets assigned this `StateType`. This also propagates backwards in the pattern graph in the case that the final pattern is optional (0 to sth times). 
    
    ii) the `addTransitions()` should also be extended to account for quantifiers  at the end and at the start of the pattern when "unrolling" or expanding the original pattern, e.g. the `expandRepeatingPattern()` should not always assign `State.StateType.Normal` to the unrolled patterns as they may be at the end or the start.
    
    iii) the `NFA` should also be modified to account for the new `PotentialFinal` state. In this case, when we see such a pattern, a flag should be set, and for example in the one-to-many case the pattern should be matched when the next non-matching element comes.
    
    Again these are some initial thoughts. We can discuss more on the design if you want, as this seems to be a bigger change than expected ;)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---