You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Aitozi <gi...@git.apache.org> on 2018/06/14 14:28:25 UTC

[GitHub] flink pull request #6168: [FLINK-9588]Reused context with same computation s...

GitHub user Aitozi opened a pull request:

    https://github.com/apache/flink/pull/6168

    [FLINK-9588]Reused context with same computation state calculate

    ## What is the purpose of the change
    
    Now cep checkFilterCondition with a newly created Conditioncontext for each edge, which will result in the repeatable getEventsForPattern due to the init of `shouldUpdate`

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Aitozi/flink context-reuse

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/6168.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #6168
    
----
commit ed71ac4407de9d8163efa8c334d9ac0e63e47069
Author: minwenjun <mi...@...>
Date:   2018-06-14T14:24:02Z

    [FLINK-9588]Reused context with same computation state calculate

----


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r197482701
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -609,7 +611,7 @@ public void close() throws Exception {
     							startTimestamp);
     
     					//check if newly created state is optional (have a PROCEED path to Final state)
    -					final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
    +					final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
    --- End diff --
    
    Yes, you are right. Read the code again,  the `TAKE` branch only put the new `Node` to sharedBuffer which just point to the previousNodeId, This indeed don't affect the result of the current CS's partial match. I will take your suggestion


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195738359
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
     
     		final Stack<State<T>> states = new Stack<>();
     		states.push(state);
    +		ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
    --- End diff --
    
    agree


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195719385
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
     
     		final Stack<State<T>> states = new Stack<>();
     		states.push(state);
    +		ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
    --- End diff --
    
    How about creating the context only once at the very beginning of computeNextStates?


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r197461704
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -609,7 +611,7 @@ public void close() throws Exception {
     							startTimestamp);
     
     					//check if newly created state is optional (have a PROCEED path to Final state)
    -					final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
    +					final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
    --- End diff --
    
    Yes that is true the shared buffer might change but only "after" the CS. I mean those changes can't affect partial match for current Computation State. Therefore I would suggest to remove the flag completely and just keep the lazy evaluation of `matchedEvents` in `ConditionContext` by comparing it to null:
    
    	if (matchedEvents == null) {
    		this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, computationState);
    	}



---

[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6168
  
    +1, will merge soon


---

[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on the issue:

    https://github.com/apache/flink/pull/6168
  
    Hi @dawidwys , since this commit has been merged in, is this pr need to be closed by me?


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195719361
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -662,13 +662,13 @@ private void addComputationState(
     			ComputationState computationState) {
     		final Stack<State<T>> statesToCheck = new Stack<>();
     		statesToCheck.push(state);
    -
    +		ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
    --- End diff --
    
    How about creating the context only once at the very beginning of computeNextStates?


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195792448
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -609,7 +611,7 @@ public void close() throws Exception {
     							startTimestamp);
     
     					//check if newly created state is optional (have a PROCEED path to Final state)
    -					final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
    +					final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
    --- End diff --
    
    I think over it again, the content of the `sharedBuffer` make difference  to  the result of the `getEventsForPattern`, so the result should be update with the change of the `sharedBuffer`. But i think we only have to reset the `shouldUpdate` flag to `true` here rather than create a  context again, right? @dawidwys 


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195720052
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
     
     		final Stack<State<T>> states = new Stack<>();
     		states.push(state);
    +		ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
    --- End diff --
    
    Rather than `computationState` just pass `conditionContext` to the `createDecisionGraph` method


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195737755
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
     
     		final Stack<State<T>> states = new Stack<>();
     		states.push(state);
    +		ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
    --- End diff --
    
    I think it needs the `conditionContext` and `computationState` and should replace the `sharedBuffer` with `conditionContext`.


---

[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on the issue:

    https://github.com/apache/flink/pull/6168
  
    Is it ok now ? @dawidwys 


---

[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on the issue:

    https://github.com/apache/flink/pull/6168
  
    please help review this pr @dawidwys , thx.


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi closed the pull request at:

    https://github.com/apache/flink/pull/6168


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195719763
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -662,13 +662,13 @@ private void addComputationState(
     			ComputationState computationState) {
    --- End diff --
    
    Replace `ComputationState` with `ConditionContext`


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195742430
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -609,7 +611,7 @@ public void close() throws Exception {
     							startTimestamp);
     
     					//check if newly created state is optional (have a PROCEED path to Final state)
    -					final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
    +					final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
    --- End diff --
    
    Why don't you use the one created in the beginning?


---

[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...

Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:

    https://github.com/apache/flink/pull/6168
  
    Hi @Aitozi, yes please close this PR, as I made a spelling mistake while merging and that's why it wasn't closed automatically. Thanks!


---

[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...

Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6168#discussion_r195745115
  
    --- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
    @@ -609,7 +611,7 @@ public void close() throws Exception {
     							startTimestamp);
     
     					//check if newly created state is optional (have a PROCEED path to Final state)
    -					final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
    +					final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
    --- End diff --
    
    I think the sharedbuffer has been changed during the `TAKE` branch, the conditionContext should be different.


---