You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/07/25 12:33:39 UTC

flink git commit: [FLINK-7170] [cep] Fix until condition when the contiguity is strict

Repository: flink
Updated Branches:
  refs/heads/master b13914799 -> 449c84b0e


[FLINK-7170] [cep] Fix until condition when the contiguity is strict

This closes #4318.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/449c84b0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/449c84b0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/449c84b0

Branch: refs/heads/master
Commit: 449c84b0e21c6cfe43ab9ed697a69fa09308b381
Parents: b139147
Author: Dian Fu <fu...@alibaba-inc.com>
Authored: Thu Jul 13 17:21:30 2017 +0800
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Tue Jul 25 14:30:59 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/nfa/compiler/NFACompiler.java     | 30 ++++++++-----
 .../flink/cep/nfa/UntilConditionITCase.java     | 47 ++++++++++++++++++++
 2 files changed, 67 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/449c84b0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index e160e4a..62464d1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -630,10 +630,12 @@ public class NFACompiler {
 
 			final IterativeCondition<T> ignoreCondition = extendWithUntilCondition(
 				getInnerIgnoreCondition(currentPattern),
-				untilCondition);
+				untilCondition,
+				false);
 			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
 				getTakeCondition(currentPattern),
-				untilCondition);
+				untilCondition,
+				true);
 
 			final IterativeCondition<T> proceedCondition = getTrueFunction();
 			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);
@@ -664,7 +666,8 @@ public class NFACompiler {
 		private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
 			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
 				getTakeCondition(currentPattern),
-				(IterativeCondition<T>) currentPattern.getUntilCondition()
+				(IterativeCondition<T>) currentPattern.getUntilCondition(),
+				true
 			);
 
 			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
@@ -683,7 +686,8 @@ public class NFACompiler {
 		private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
 			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
 				getTakeCondition(currentPattern),
-				(IterativeCondition<T>) currentPattern.getUntilCondition()
+				(IterativeCondition<T>) currentPattern.getUntilCondition(),
+				true
 			);
 
 			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
@@ -697,14 +701,16 @@ public class NFACompiler {
 		 *
 		 * @param condition the condition to extend
 		 * @param untilCondition the until condition to join with the given condition
+		 * @param isTakeCondition whether the {@code condition} is for {@code TAKE} edge
 		 * @return condition with AND applied or the original condition
 		 */
 		private IterativeCondition<T> extendWithUntilCondition(
 				IterativeCondition<T> condition,
-				IterativeCondition<T> untilCondition) {
+				IterativeCondition<T> untilCondition,
+				boolean isTakeCondition) {
 			if (untilCondition != null && condition != null) {
 				return new AndCondition<>(new NotCondition<>(untilCondition), condition);
-			} else if (untilCondition != null) {
+			} else if (untilCondition != null && isTakeCondition) {
 				return new NotCondition<>(untilCondition);
 			}
 
@@ -741,7 +747,8 @@ public class NFACompiler {
 			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
 				innerIgnoreCondition = extendWithUntilCondition(
 					innerIgnoreCondition,
-					(IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+					(IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
+					false);
 			}
 			return innerIgnoreCondition;
 		}
@@ -781,7 +788,8 @@ public class NFACompiler {
 			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
 				ignoreCondition = extendWithUntilCondition(
 					ignoreCondition,
-					(IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+					(IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
+					false);
 			}
 			return ignoreCondition;
 		}
@@ -797,7 +805,8 @@ public class NFACompiler {
 			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
 				takeCondition = extendWithUntilCondition(
 					takeCondition,
-					(IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+					(IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
+					true);
 			}
 			return takeCondition;
 		}
@@ -811,7 +820,8 @@ public class NFACompiler {
 			if (currentGroupPattern != null && currentGroupPattern.getUntilCondition() != null) {
 				trueCondition = extendWithUntilCondition(
 					trueCondition,
-					(IterativeCondition<T>) currentGroupPattern.getUntilCondition());
+					(IterativeCondition<T>) currentGroupPattern.getUntilCondition(),
+					true);
 			}
 			return trueCondition;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/449c84b0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index d56e883..639541d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -194,6 +194,53 @@ public class UntilConditionITCase {
 	}
 
 	@Test
+	public void testUntilConditionFollowedByOneOrMoreConsecutive2() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "b", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event breaking = new Event(45, "a", 5.0);
+		Event ignored = new Event(46, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(breaking, 7));
+		inputEvents.add(new StreamRecord<>(ignored, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().consecutive().until(UNTIL_CONDITION)
+			.followedBy("end").where(
+				UNTIL_CONDITION
+			);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, breaking)
+		));
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
 	public void testUntilConditionFollowedByZeroOrMore() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();