You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/07/05 13:56:08 UTC

[5/5] flink git commit: [FLINK-9588][cep] Reused context with same computation state calculate

[FLINK-9588][cep] Reused context with same computation state calculate

This cloes #6168


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/abd61cfa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/abd61cfa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/abd61cfa

Branch: refs/heads/master
Commit: abd61cfacf62d909a9a6a2d843f3be97a6f629ee
Parents: ce345e3
Author: minwenjun <mi...@didichuxing.com>
Authored: Thu Jun 14 22:24:02 2018 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Thu Jul 5 15:54:54 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 33 +++++++-------------
 1 file changed, 12 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/abd61cfa/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 041a017..276cde7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -536,7 +536,9 @@ public class NFA<T> {
 			final EventWrapper event,
 			final long timestamp) throws Exception {
 
-		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(sharedBuffer, computationState, event.getEvent());
+		final ConditionContext<T> context = new ConditionContext<>(this, sharedBuffer, computationState);
+
+		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(context, computationState, event.getEvent());
 
 		// Create the computing version based on the previously computed edges
 		// We need to defer the creation of computation states until we know how many edges start
@@ -609,7 +611,7 @@ public class NFA<T> {
 							startTimestamp);
 
 					//check if newly created state is optional (have a PROCEED path to Final state)
-					final State<T> finalState = findFinalStateAfterProceed(sharedBuffer, nextState, event.getEvent(), computationState);
+					final State<T> finalState = findFinalStateAfterProceed(context, nextState, event.getEvent());
 					if (finalState != null) {
 						addComputationState(
 								sharedBuffer,
@@ -656,19 +658,17 @@ public class NFA<T> {
 	}
 
 	private State<T> findFinalStateAfterProceed(
-			SharedBuffer<T> sharedBuffer,
+			ConditionContext<T> context,
 			State<T> state,
-			T event,
-			ComputationState computationState) {
+			T event) {
 		final Stack<State<T>> statesToCheck = new Stack<>();
 		statesToCheck.push(state);
-
 		try {
 			while (!statesToCheck.isEmpty()) {
 				final State<T> currentState = statesToCheck.pop();
 				for (StateTransition<T> transition : currentState.getStateTransitions()) {
 					if (transition.getAction() == StateTransitionAction.PROCEED &&
-							checkFilterCondition(sharedBuffer, computationState, transition.getCondition(), event)) {
+							checkFilterCondition(context, transition.getCondition(), event)) {
 						if (transition.getTargetState().isFinal()) {
 							return transition.getTargetState();
 						} else {
@@ -689,7 +689,7 @@ public class NFA<T> {
 	}
 
 	private OutgoingEdges<T> createDecisionGraph(
-			SharedBuffer<T> sharedBuffer,
+			ConditionContext<T> context,
 			ComputationState computationState,
 			T event) {
 		State<T> state = getState(computationState);
@@ -706,7 +706,7 @@ public class NFA<T> {
 			// check all state transitions for each state
 			for (StateTransition<T> stateTransition : stateTransitions) {
 				try {
-					if (checkFilterCondition(sharedBuffer, computationState, stateTransition.getCondition(), event)) {
+					if (checkFilterCondition(context, stateTransition.getCondition(), event)) {
 						// filter condition is true
 						switch (stateTransition.getAction()) {
 							case PROCEED:
@@ -729,11 +729,10 @@ public class NFA<T> {
 	}
 
 	private boolean checkFilterCondition(
-			SharedBuffer<T> sharedBuffer,
-			ComputationState computationState,
+			ConditionContext<T> context,
 			IterativeCondition<T> condition,
 			T event) throws Exception {
-		return condition == null || condition.filter(event, new ConditionContext<>(this, sharedBuffer, computationState));
+		return condition == null || condition.filter(event, context);
 	}
 
 	/**
@@ -779,12 +778,6 @@ public class NFA<T> {
 	 */
 	private static class ConditionContext<T> implements IterativeCondition.Context<T> {
 
-		/**
-		 * A flag indicating if we should recompute the matching pattern, so that
-		 * the {@link IterativeCondition iterative condition} can be evaluated.
-		 */
-		private boolean shouldUpdate;
-
 		/** The current computation state. */
 		private ComputationState computationState;
 
@@ -806,7 +799,6 @@ public class NFA<T> {
 			this.computationState = computationState;
 			this.nfa = nfa;
 			this.sharedBuffer = sharedBuffer;
-			this.shouldUpdate = true;
 		}
 
 		@Override
@@ -816,9 +808,8 @@ public class NFA<T> {
 			// the (partially) matched pattern is computed lazily when this method is called.
 			// this is to avoid any overheads when using a simple, non-iterative condition.
 
-			if (shouldUpdate) {
+			if (matchedEvents == null) {
 				this.matchedEvents = nfa.extractCurrentMatches(sharedBuffer, computationState);
-				shouldUpdate = false;
 			}
 
 			return new Iterable<T>() {