You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by Aitozi <gi...@git.apache.org> on 2018/06/14 14:28:25 UTC
[GitHub] flink pull request #6168: [FLINK-9588]Reused context with same computation s...
GitHub user Aitozi opened a pull request:
https://github.com/apache/flink/pull/6168
[FLINK-9588]Reused context with same computation state calculate
## What is the purpose of the change
Now cep checkFilterCondition with a newly created Conditioncontext for each edge, which will result in the repeatable getEventsForPattern due to the init of `shouldUpdate`
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/Aitozi/flink context-reuse
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/6168.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #6168
----
commit ed71ac4407de9d8163efa8c334d9ac0e63e47069
Author: minwenjun <mi...@...>
Date: 2018-06-14T14:24:02Z
[FLINK-9588]Reused context with same computation state calculate
----
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r197482701
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
//check if newly created state is optional (have a PROCEED path to Final state)
- final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
+ final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
--- End diff --
Yes, you are right. Read the code again, the `TAKE` branch only put the new `Node` to sharedBuffer which just point to the previousNodeId, This indeed don't affect the result of the current CS's partial match. I will take your suggestion
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195738359
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
final Stack<State<T>> states = new Stack<>();
states.push(state);
+ ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
--- End diff --
agree
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195719385
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
final Stack<State<T>> states = new Stack<>();
states.push(state);
+ ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
--- End diff --
How about creating the context only once at the very beginning of computeNextStates?
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r197461704
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
//check if newly created state is optional (have a PROCEED path to Final state)
- final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
+ final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
--- End diff --
Yes that is true the shared buffer might change but only "after" the CS. I mean those changes can't affect partial match for current Computation State. Therefore I would suggest to remove the flag completely and just keep the lazy evaluation of `matchedEvents` in `ConditionContext` by comparing it to null:
if (matchedEvents == null) {
this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, computationState);
}
---
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:
https://github.com/apache/flink/pull/6168
+1, will merge soon
---
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on the issue:
https://github.com/apache/flink/pull/6168
Hi @dawidwys , since this commit has been merged in, is this pr need to be closed by me?
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195719361
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -662,13 +662,13 @@ private void addComputationState(
ComputationState computationState) {
final Stack<State<T>> statesToCheck = new Stack<>();
statesToCheck.push(state);
-
+ ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
--- End diff --
How about creating the context only once at the very beginning of computeNextStates?
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195792448
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
//check if newly created state is optional (have a PROCEED path to Final state)
- final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
+ final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
--- End diff --
I think over it again, the content of the `sharedBuffer` make difference to the result of the `getEventsForPattern`, so the result should be update with the change of the `sharedBuffer`. But i think we only have to reset the `shouldUpdate` flag to `true` here rather than create a context again, right? @dawidwys
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195720052
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
final Stack<State<T>> states = new Stack<>();
states.push(state);
+ ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
--- End diff --
Rather than `computationState` just pass `conditionContext` to the `createDecisionGraph` method
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195737755
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -697,6 +697,7 @@ private int calculateIncreasingSelfState(int ignoreBranches, int takeBranches) {
final Stack<State<T>> states = new Stack<>();
states.push(state);
+ ConditionContext<T> context = new ConditionContext<T>(this, sharedBuffer, computationState);
--- End diff --
I think it needs the `conditionContext` and `computationState` and should replace the `sharedBuffer` with `conditionContext`.
---
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on the issue:
https://github.com/apache/flink/pull/6168
Is it ok now ? @dawidwys
---
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on the issue:
https://github.com/apache/flink/pull/6168
please help review this pr @dawidwys , thx.
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi closed the pull request at:
https://github.com/apache/flink/pull/6168
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195719763
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -662,13 +662,13 @@ private void addComputationState(
ComputationState computationState) {
--- End diff --
Replace `ComputationState` with `ConditionContext`
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195742430
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
//check if newly created state is optional (have a PROCEED path to Final state)
- final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
+ final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
--- End diff --
Why don't you use the one created in the beginning?
---
[GitHub] flink issue #6168: [FLINK-9588][CEP]Reused context with same computation sta...
Posted by dawidwys <gi...@git.apache.org>.
Github user dawidwys commented on the issue:
https://github.com/apache/flink/pull/6168
Hi @Aitozi, yes please close this PR, as I made a spelling mistake while merging and that's why it wasn't closed automatically. Thanks!
---
[GitHub] flink pull request #6168: [FLINK-9588][CEP]Reused context with same computat...
Posted by Aitozi <gi...@git.apache.org>.
Github user Aitozi commented on a diff in the pull request:
https://github.com/apache/flink/pull/6168#discussion_r195745115
--- Diff: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java ---
@@ -609,7 +611,7 @@ public void close() throws Exception {
startTimestamp);
//check if newly created state is optional (have a PROCEED path to Final state)
- final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
+ final State<T> finalState = findFinalStateAfterProceed(new ConditionContext<>(this, sharedBuffer, computationState), nextState, event.getEvent());
--- End diff --
I think the sharedbuffer has been changed during the `TAKE` branch, the conditionContext should be different.
---