You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/21 14:41:12 UTC

[GitHub] Aitozi closed pull request #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern is the head of the pattern

Aitozi closed pull request #6124: [FLINK-8914][CEP]Fix wrong semantic when greedy pattern is the head of the pattern
URL: https://github.com/apache/flink/pull/6124
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
index 88ef3d3288a..1685ffacc7b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/ComputationState.java
@@ -91,6 +91,10 @@ public boolean isStartState() {
 		return state.isStart() && event == null;
 	}
 
+	public boolean isGreedyState() {
+		return state.isGreedy();
+	}
+
 	public long getTimestamp() {
 		return timestamp;
 	}
@@ -137,6 +141,19 @@ public int hashCode() {
 		return Objects.hash(state, event, counter, timestamp, version, startTimestamp, previousState);
 	}
 
+	@Override
+	public String toString() {
+		StringBuilder builder = new StringBuilder();
+
+		builder.append("current state: ").append(state).append("\n")
+			.append("previous state: ").append(previousState).append("\n")
+			.append("start timestamp: ").append(startTimestamp).append("\n")
+			.append("counter: ").append(counter).append("\n")
+			.append("version: ").append(version);
+
+		return builder.toString();
+	}
+
 	public static <T> ComputationState<T> createStartState(final NFA<T> nfa, final State<T> state) {
 		Preconditions.checkArgument(state.isStart());
 		return new ComputationState<>(nfa, state, null, null, 0, -1L, new DeweyNumber(1), -1L);
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 5624db9de43..16ea163aadf 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -220,7 +220,10 @@ public void resetNFAChanged() {
 	 */
 	public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(final T event,
 		final long timestamp, AfterMatchSkipStrategy afterMatchSkipStrategy) {
+		List<ComputationState<T>> redundantStart = pickRedundantGreedyStartState(new ArrayList<>(computationStates));
+		computationStates.removeAll(redundantStart);
 		final int numberComputationStates = computationStates.size();
+
 		final Collection<Map<String, List<T>>> result = new ArrayList<>();
 		final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
 
@@ -309,7 +312,7 @@ public void resetNFAChanged() {
 			}
 
 		}
-
+		computationStates.addAll(redundantStart);
 		discardComputationStatesAccordingToStrategy(computationStates, result, afterMatchSkipStrategy);
 
 		// prune shared buffer based on window length
@@ -626,6 +629,29 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
 		return resultingComputationStates;
 	}
 
+	private List<ComputationState<T>> pickRedundantGreedyStartState(List<ComputationState<T>> computationStates) {
+		List<ComputationState<T>> greedyState = new ArrayList<>();
+		List<ComputationState<T>> startState = new ArrayList<>();
+		List<ComputationState<T>> redundantStart = new ArrayList<>();
+		for (ComputationState<T> computationState : computationStates) {
+			if (computationState.isGreedyState()) {
+				greedyState.add(computationState);
+			} else if (computationState.isStartState()) {
+				startState.add(computationState);
+			}
+		}
+
+		for (ComputationState<T> start: startState) {
+			for (ComputationState<T> greedy : greedyState) {
+				if (NFAStateNameHandler.getOriginalNameFromInternal(start.getState().getName()).equals(
+					NFAStateNameHandler.getOriginalNameFromInternal(greedy.getState().getName()))) {
+					redundantStart.add(start);
+				}
+			}
+		}
+		return redundantStart;
+	}
+
 	private void addComputationState(
 			List<ComputationState<T>> computationStates,
 			State<T> currentState,
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 0120398b4b2..b2c80370d0e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -60,6 +60,14 @@ public boolean isStart() {
 		return stateType == StateType.Start;
 	}
 
+	public boolean isStop() {
+		return stateType == StateType.Stop;
+	}
+
+	public boolean isGreedy() {
+		return stateType == StateType.Greedy;
+	}
+
 	public String getName() {
 		return name;
 	}
@@ -131,10 +139,6 @@ public int hashCode() {
 		return Objects.hash(name, stateType, stateTransitions);
 	}
 
-	public boolean isStop() {
-		return stateType == StateType.Stop;
-	}
-
 	/**
 	 * Set of valid state types.
 	 */
@@ -142,6 +146,7 @@ public boolean isStop() {
 		Start, // the state is a starting state for the NFA
 		Final, // the state is a final state for the NFA
 		Normal, // the state is neither a start nor a final state
-		Stop
+		Stop, // the state will lead to the match to failed
+		Greedy // the state has the greedy proerty
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index d624f3bb6de..308143d6c72 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -93,7 +93,7 @@ public String toString() {
 				.append("StateTransition(")
 				.append(action).append(", ")
 				.append("from ").append(sourceState.getName())
-				.append("to ").append(targetState.getName())
+				.append(" to ").append(targetState.getName())
 				.append(condition != null ? ", with condition)" : ")")
 				.toString();
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 39e8d34acef..20ccb6b7966 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -573,7 +573,9 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
 				return createGroupPatternState((GroupPattern) currentPattern, sinkState, proceedState, isOptional);
 			}
 
-			final State<T> singletonState = createState(currentPattern.getName(), State.StateType.Normal);
+			State.StateType type = currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) ? State.StateType.Greedy : State.StateType.Normal;
+
+			final State<T> singletonState = createState(currentPattern.getName(), type);
 			// if event is accepted then all notPatterns previous to the optional states are no longer valid
 			final State<T> sink = copyWithoutTransitiveNots(sinkState);
 			singletonState.addTake(sink, takeCondition);
@@ -707,7 +709,9 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
 				true);
 
 			IterativeCondition<T> proceedCondition = getTrueFunction();
-			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);
+			State.StateType type = currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY) ? State.StateType.Greedy : State.StateType.Normal;
+
+			final State<T> loopingState = createState(currentPattern.getName(), type);
 
 			if (currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.GREEDY)) {
 				if (untilCondition != null) {
@@ -728,7 +732,7 @@ private boolean isPatternOptional(Pattern<T, ?> pattern) {
 			addStopStateToLooping(loopingState);
 
 			if (ignoreCondition != null) {
-				final State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
+				final State<T> ignoreState = createState(currentPattern.getName(), type);
 				ignoreState.addTake(loopingState, takeCondition);
 				ignoreState.addIgnore(ignoreCondition);
 				loopingState.addIgnore(ignoreState, ignoreCondition);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
index 2c7f23c024f..d7198403b8a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
@@ -904,4 +904,110 @@ public boolean filter(Event value) throws Exception {
 			Lists.newArrayList(c, a1, a2, a3, a4, d)
 		));
 	}
+
+	@Test
+	public void testGreedyConsecutiveStartState() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event c = new Event(40, "c", 1.0);
+		Event a1 = new Event(41, "a", 2.0);
+		Event a2 = new Event(42, "a", 2.0);
+		Event a3 = new Event(43, "a", 2.0);
+		Event d = new Event(44, "d", 3.0);
+		Event a4 = new Event(45, "a", 2.0);
+		Event a5 = new Event(46, "a", 2.0);
+		Event d2 = new Event(44, "d", 3.0);
+
+		inputEvents.add(new StreamRecord<>(c, 1));
+		inputEvents.add(new StreamRecord<>(a1, 2));
+		inputEvents.add(new StreamRecord<>(a2, 3));
+		inputEvents.add(new StreamRecord<>(a3, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+		inputEvents.add(new StreamRecord<>(a4, 5));
+		inputEvents.add(new StreamRecord<>(a5, 5));
+		inputEvents.add(new StreamRecord<>(d2, 5));
+
+		// a* d
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		})
+		.oneOrMore()
+		.optional()
+		.consecutive()
+		.greedy()
+		.followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, a2, a3, d),
+			Lists.newArrayList(a4, a5, d2)
+		));
+	}
+
+	@Test
+	public void testGreedyReleaxStartState() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event c = new Event(40, "c", 1.0);
+		Event a1 = new Event(41, "a", 2.0);
+		Event a2 = new Event(42, "a", 2.0);
+		Event a3 = new Event(43, "a", 2.0);
+		Event d = new Event(44, "d", 3.0);
+		Event a4 = new Event(45, "a", 2.0);
+		Event a5 = new Event(46, "a", 2.0);
+		Event d2 = new Event(47, "d", 3.0);
+
+		inputEvents.add(new StreamRecord<>(c, 1));
+		inputEvents.add(new StreamRecord<>(a1, 2));
+		inputEvents.add(new StreamRecord<>(a2, 3));
+		inputEvents.add(new StreamRecord<>(a3, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+		inputEvents.add(new StreamRecord<>(a4, 5));
+		inputEvents.add(new StreamRecord<>(a5, 5));
+		inputEvents.add(new StreamRecord<>(d2, 5));
+
+		// a* d
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		})
+		.oneOrMore()
+		.optional()
+		.greedy()
+		.followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, a2, a3, d),
+			Lists.newArrayList(a1, a2, a3, a4, a5, d2)
+		));
+	}
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services