You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/05 13:56:08 UTC
[5/5] flink git commit: [FLINK-9588][cep] Reused context with same
computation state calculate
[FLINK-9588][cep] Reused context with same computation state calculate
This cloes #6168
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abd61cfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abd61cfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abd61cfa
Branch: refs/heads/master
Commit: abd61cfacf62d909a9a6a2d843f3be97a6f629ee
Parents: ce345e3
Author: minwenjun <mi...@didichuxing.com>
Authored: Thu Jun 14 22:24:02 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Thu Jul 5 15:54:54 2018 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/cep/nfa/NFA.java | 33 +++++++-------------
1 file changed, 12 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/abd61cfa/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 041a017..276cde7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -536,7 +536,9 @@ public class NFA<T> {
final EventWrapper event,
final long timestamp) throws Exception {
- final OutgoingEdges<T> outgoingEdges = createDecisionGraph(sharedBuffer, computationState, event.getEvent());
+ final ConditionContext<T> context = new ConditionContext<>(this, sharedBuffer, computationState);
+
+ final OutgoingEdges<T> outgoingEdges = createDecisionGraph(context, computationState, event.getEvent());
// Create the computing version based on the previously computed edges
// We need to defer the creation of computation states until we know how many edges start
@@ -609,7 +611,7 @@ public class NFA<T> {
startTimestamp);
//check if newly created state is optional (have a PROCEED path to Final state)
- final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
+ final State<T> finalState = findFinalStateAfterProceed(context, nextState, event.getEvent());
if (finalState != null) {
addComputationState(
sharedBuffer,
@@ -656,19 +658,17 @@ public class NFA<T> {
}
private State<T> findFinalStateAfterProceed(
- SharedBuffer<T> sharedBuffer,
+ ConditionContext<T> context,
State<T> state,
- T event,
- ComputationState computationState) {
+ T event) {
final Stack<State<T>> statesToCheck = new Stack<>();
statesToCheck.push(state);
-
try {
while (!statesToCheck.isEmpty()) {
final State<T> currentState = statesToCheck.pop();
for (StateTransition<T> transition : currentState.getStateTransitions()) {
if (transition.getAction() == StateTransitionAction.PROCEED &&
- checkFilterCondition(sharedBuffer, computationState, transition.getCondition(), event)) {
+ checkFilterCondition(context, transition.getCondition(), event)) {
if (transition.getTargetState().isFinal()) {
return transition.getTargetState();
} else {
@@ -689,7 +689,7 @@ public class NFA<T> {
}
private OutgoingEdges<T> createDecisionGraph(
- SharedBuffer<T> sharedBuffer,
+ ConditionContext<T> context,
ComputationState computationState,
T event) {
State<T> state = getState(computationState);
@@ -706,7 +706,7 @@ public class NFA<T> {
// check all state transitions for each state
for (StateTransition<T> stateTransition : stateTransitions) {
try {
- if (checkFilterCondition(sharedBuffer, computationState, stateTransition.getCondition(), event)) {
+ if (checkFilterCondition(context, stateTransition.getCondition(), event)) {
// filter condition is true
switch (stateTransition.getAction()) {
case PROCEED:
@@ -729,11 +729,10 @@ public class NFA<T> {
}
private boolean checkFilterCondition(
- SharedBuffer<T> sharedBuffer,
- ComputationState computationState,
+ ConditionContext<T> context,
IterativeCondition<T> condition,
T event) throws Exception {
- return condition == null || condition.filter(event, new ConditionContext<>(this, sharedBuffer, computationState));
+ return condition == null || condition.filter(event, context);
}
/**
@@ -779,12 +778,6 @@ public class NFA<T> {
*/
private static class ConditionContext<T> implements IterativeCondition.Context<T> {
- /**
- * A flag indicating if we should recompute the matching pattern, so that
- * the {@link IterativeCondition iterative condition} can be evaluated.
- */
- private boolean shouldUpdate;
-
/** The current computation state. */
private ComputationState computationState;
@@ -806,7 +799,6 @@ public class NFA<T> {
this.computationState = computationState;
this.nfa = nfa;
this.sharedBuffer = sharedBuffer;
- this.shouldUpdate = true;
}
@Override
@@ -816,9 +808,8 @@ public class NFA<T> {
// the (partially) matched pattern is computed lazily when this method is called.
// this is to avoid any overheads when using a simple, non-iterative condition.
- if (shouldUpdate) {
+ if (matchedEvents == null) {
this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, computationState);
- shouldUpdate = false;
}
return new Iterable<T>() {