You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:19 UTC

[02/10] flink git commit: [FLINK-8725] Separate state from NFA in CEP library

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index 37ad006..3879699 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -36,8 +36,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Tests if the {@link NFA} status ({@link NFA#computationStates} or {@link NFA#eventSharedBuffer})
- * is changed after processing events.
+ * Tests if the {@link NFAState} status is changed after processing events.
  */
 public class NFAStatusChangeITCase {
 
@@ -76,45 +75,47 @@ public class NFAStatusChangeITCase {
 		NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
 		NFA<Event> nfa = nfaFactory.createNFA();
 
-		nfa.process(new Event(1, "b", 1.0), 1L);
-		assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfa.isNFAChanged());
+		NFAState<Event> nfaState = nfa.createNFAState();
 
-		nfa.resetNFAChanged();
-		nfa.process(new Event(2, "a", 1.0), 2L);
-		assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfa.isNFAChanged());
+		nfa.process(nfaState, new Event(1, "b", 1.0), 1L);
+		assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
+
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(2, "a", 1.0), 2L);
+		assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
 
 		// the status of the queue of ComputationStatus changed,
 		// more than one ComputationStatus is generated by the event from some ComputationStatus
-		nfa.resetNFAChanged();
-		nfa.process(new Event(3, "f", 1.0), 3L);
-		assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfa.isNFAChanged());
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(3, "f", 1.0), 3L);
+		assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
-		nfa.resetNFAChanged();
-		nfa.process(new Event(4, "f", 1.0), 4L);
-		assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfa.isNFAChanged());
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(4, "f", 1.0), 4L);
+		assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
-		nfa.resetNFAChanged();
-		nfa.process(new Event(5, "b", 1.0), 5L);
-		assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfa.isNFAChanged());
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(5, "b", 1.0), 5L);
+		assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
-		nfa.resetNFAChanged();
-		nfa.process(new Event(6, "d", 1.0), 6L);
-		assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfa.isNFAChanged());
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(6, "d", 1.0), 6L);
+		assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
 		// as the timestamp is within the window
-		nfa.resetNFAChanged();
-		nfa.process(null, 8L);
-		assertFalse("NFA status should not change as the timestamp is within the window", nfa.isNFAChanged());
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, null, 8L);
+		assertFalse("NFA status should not change as the timestamp is within the window", nfaState.isStateChanged());
 
 		// timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will
 		// be removed from eventSharedBuffer as the timeout happens
-		nfa.resetNFAChanged();
-		Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.process(null, 12L).f1;
-		assertTrue("NFA status should change as timeout happens", nfa.isNFAChanged() && !timeoutResults.isEmpty());
+		nfaState.resetStateChanged();
+		Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.process(nfaState, null, 12L).f1;
+		assertTrue("NFA status should change as timeout happens", nfaState.isStateChanged() && !timeoutResults.isEmpty());
 	}
 
 	@Test
@@ -143,12 +144,14 @@ public class NFAStatusChangeITCase {
 		NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
 		NFA<Event> nfa = nfaFactory.createNFA();
 
-		nfa.resetNFAChanged();
-		nfa.process(new Event(6, "start", 1.0), 6L);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(6, "start", 1.0), 6L);
 
-		nfa.resetNFAChanged();
-		nfa.process(new Event(6, "a", 1.0), 7L);
-		assertTrue(nfa.isNFAChanged());
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(6, "a", 1.0), 7L);
+		assertTrue(nfaState.isStateChanged());
 	}
 
 	@Test
@@ -170,11 +173,13 @@ public class NFAStatusChangeITCase {
 		NFACompiler.NFAFactory<Event> nfaFactory = NFACompiler.compileFactory(pattern, Event.createTypeSerializer(), true);
 		NFA<Event> nfa = nfaFactory.createNFA();
 
-		nfa.resetNFAChanged();
-		nfa.process(new Event(6, "start", 1.0), 6L);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(6, "start", 1.0), 6L);
 
-		nfa.resetNFAChanged();
-		nfa.process(new Event(6, "end", 1.0), 17L);
-		assertTrue(nfa.isNFAChanged());
+		nfaState.resetStateChanged();
+		nfa.process(nfaState, new Event(6, "end", 1.0), 17L);
+		assertTrue(nfaState.isStateChanged());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 7721653..5d43111 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals;
 public class NFATest extends TestLogger {
 	@Test
 	public void testSimpleNFA() {
-		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, false);
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
@@ -86,9 +85,12 @@ public class NFATest extends TestLogger {
 			});
 		endState.addIgnore(BooleanConditions.<Event>trueFunction());
 
-		nfa.addState(startState);
-		nfa.addState(endState);
-		nfa.addState(endingState);
+		List<State<Event>> states = new ArrayList<>();
+		states.add(startState);
+		states.add(endState);
+		states.add(endingState);
+
+		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), 0, false, states);
 
 		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
 
@@ -103,7 +105,7 @@ public class NFATest extends TestLogger {
 		expectedPatterns.add(firstPattern);
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createNFAState(), streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -126,7 +128,7 @@ public class NFATest extends TestLogger {
 
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createNFAState(), streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -145,7 +147,7 @@ public class NFATest extends TestLogger {
 
 		Set<Map<String, List<Event>>> expectedPatterns = Collections.emptySet();
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createNFAState(), streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -172,7 +174,7 @@ public class NFATest extends TestLogger {
 
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createNFAState(), streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -187,21 +189,25 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "loop", 3.0), 103L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "loop", 4.0), 104L));
 		streamEvents.add(new StreamRecord<>(new Event(5, "loop", 5.0), 105L));
-		runNFA(nfa, streamEvents);
 
-		NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+		NFAState<Event> nfaState = nfa.createNFAState();
+		runNFA(nfa, nfaState, streamEvents);
+
+		NFAStateSerializer<Event> serializer = new NFAStateSerializer<>(Event.createTypeSerializer());
 
 		//serialize
 		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos));
+		serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
 		baos.close();
 	}
 
-	public <T> Collection<Map<String, List<T>>> runNFA(NFA<T> nfa, List<StreamRecord<T>> inputs) {
+	public <T> Collection<Map<String, List<T>>> runNFA(
+		NFA<T> nfa, NFAState<T> nfaState, List<StreamRecord<T>> inputs) {
 		Set<Map<String, List<T>>> actualPatterns = new HashSet<>();
 
 		for (StreamRecord<T> streamEvent : inputs) {
 			Collection<Map<String, List<T>>> matchedPatterns = nfa.process(
+				nfaState,
 				streamEvent.getValue(),
 				streamEvent.getTimestamp()).f0;
 
@@ -311,24 +317,26 @@ public class NFATest extends TestLogger {
 			Event b3 = new Event(41, "b", 5.0);
 			Event d = new Event(43, "d", 4.0);
 
-			nfa.process(a, 1);
-			nfa.process(b, 2);
-			nfa.process(c, 3);
-			nfa.process(b1, 4);
-			nfa.process(b2, 5);
-			nfa.process(b3, 6);
-			nfa.process(d, 7);
-			nfa.process(a, 8);
+			NFAState<Event> nfaState = nfa.createNFAState();
+
+			nfa.process(nfaState, a, 1);
+			nfa.process(nfaState, b, 2);
+			nfa.process(nfaState, c, 3);
+			nfa.process(nfaState, b1, 4);
+			nfa.process(nfaState, b2, 5);
+			nfa.process(nfaState, b3, 6);
+			nfa.process(nfaState, d, 7);
+			nfa.process(nfaState, a, 8);
 
-			NFA.NFASerializer<Event> serializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+			NFAStateSerializer<Event> serializer = new NFAStateSerializer<>(Event.createTypeSerializer());
 
 			//serialize
 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			serializer.serialize(nfa, new DataOutputViewStreamWrapper(baos));
+			serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
 			baos.close();
 
 			// copy
-			NFA.NFASerializer<Event> copySerializer = new NFA.NFASerializer<>(Event.createTypeSerializer());
+			NFAStateSerializer<Event> copySerializer = new NFAStateSerializer<>(Event.createTypeSerializer());
 			ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
 			ByteArrayOutputStream out = new ByteArrayOutputStream();
 			copySerializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
@@ -337,15 +345,14 @@ public class NFATest extends TestLogger {
 
 			// deserialize
 			ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
-			NFA<Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
+			NFAState<Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
 			bais.close();
 
-			assertEquals(nfa, copy);
+			assertEquals(nfaState, copy);
 		}
 	}
 
 	private NFA<Event> createStartEndNFA(long windowLength) {
-		NFA<Event> nfa = new NFA<>(Event.createTypeSerializer(), windowLength, false);
 
 		State<Event> startState = new State<>("start", State.StateType.Start);
 		State<Event> endState = new State<>("end", State.StateType.Normal);
@@ -373,11 +380,12 @@ public class NFATest extends TestLogger {
 			});
 		endState.addIgnore(BooleanConditions.<Event>trueFunction());
 
-		nfa.addState(startState);
-		nfa.addState(endState);
-		nfa.addState(endingState);
+		List<State<Event>> states = new ArrayList<>();
+		states.add(startState);
+		states.add(endState);
+		states.add(endingState);
 
-		return nfa;
+		return new NFA<>(Event.createTypeSerializer(), windowLength, false, states);
 	}
 
 	private NFA<Event> createLoopingNFA(long windowLength) {

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
index a9e1795..b5312cb 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -35,16 +35,36 @@ import java.util.Map;
  */
 public class NFATestUtilities {
 
-	public static List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
-		return feedNFA(inputEvents, nfa, AfterMatchSkipStrategy.noSkip());
+	public static List<List<Event>> feedNFA(
+		List<StreamRecord<Event>> inputEvents,
+		NFA<Event> nfa) {
+		return feedNFA(inputEvents, nfa, nfa.createNFAState(), AfterMatchSkipStrategy.noSkip());
 	}
 
-	public static List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa,
-											AfterMatchSkipStrategy afterMatchSkipStrategy) {
+	public static List<List<Event>> feedNFA(
+			List<StreamRecord<Event>> inputEvents,
+			NFA<Event> nfa,
+			NFAState<Event> nfaState) {
+		return feedNFA(inputEvents, nfa, nfaState, AfterMatchSkipStrategy.noSkip());
+	}
+
+	public static List<List<Event>> feedNFA(
+		List<StreamRecord<Event>> inputEvents,
+		NFA<Event> nfa,
+		AfterMatchSkipStrategy afterMatchSkipStrategy) {
+		return feedNFA(inputEvents, nfa, nfa.createNFAState(), afterMatchSkipStrategy);
+	}
+
+	public static List<List<Event>> feedNFA(
+			List<StreamRecord<Event>> inputEvents,
+			NFA<Event> nfa,
+			NFAState<Event> nfaState,
+			AfterMatchSkipStrategy afterMatchSkipStrategy) {
 		List<List<Event>> resultingPatterns = new ArrayList<>();
 
 		for (StreamRecord<Event> inputEvent : inputEvents) {
 			Collection<Map<String, List<Event>>> patterns = nfa.process(
+				nfaState,
 				inputEvent.getValue(),
 				inputEvent.getTimestamp(),
 				afterMatchSkipStrategy).f0;

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index 357107f..c94d739 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -140,11 +140,13 @@ public void testClearingBuffer() throws Exception {
 
 	NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+	NFAState<Event> nfaState = nfa.createNFAState();
+
+	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 		Lists.newArrayList(a1, b1, c1, d)
 	));
-	assertTrue(nfa.isEmpty());
+	assertTrue(nfaState.isEmpty());
 }
 
 @Test
@@ -182,13 +184,15 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 
 	NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+	NFAState<Event> nfaState = nfa.createNFAState();
+
+	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 		Lists.newArrayList(a1, d1, d2, d3),
 		Lists.newArrayList(a1, d1, d2),
 		Lists.newArrayList(a1, d1)
 	));
-	assertTrue(nfa.isEmpty());
+	assertTrue(nfaState.isEmpty());
 }
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index f88e5b2..b603174 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -91,13 +91,15 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -137,7 +139,9 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -145,7 +149,7 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -185,13 +189,15 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -233,12 +239,14 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -278,14 +286,16 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -325,7 +335,9 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -334,7 +346,7 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -374,14 +386,16 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -502,7 +516,9 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -510,7 +526,7 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1)
 		));
 
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -553,7 +569,9 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -561,7 +579,7 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1)
 		));
 
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 
 	@Test
@@ -604,7 +622,9 @@ public class UntilConditionITCase {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+		NFAState<Event> nfaState = nfa.createNFAState();
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -613,6 +633,6 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent)
 		));
 
-		assertTrue(nfa.isEmpty());
+		assertTrue(nfaState.isEmpty());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index ec2cf47..d1b6d59 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -40,6 +40,7 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -132,7 +133,7 @@ public class NFACompilerTest extends TestLogger {
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
 
-		Set<State<Event>> states = nfa.getStates();
+		Collection<State<Event>> states = nfa.getStates();
 		assertEquals(4, states.size());
 
 		Map<String, State<Event>> stateMap = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 2fda47f..3151498 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -71,7 +71,7 @@ public class CEPMigrationTest {
 
 	@Parameterized.Parameters(name = "Migration Savepoint: {0}")
 	public static Collection<MigrationVersion> parameters () {
-		return Arrays.asList(MigrationVersion.v1_3, MigrationVersion.v1_4);
+		return Arrays.asList(MigrationVersion.v1_5);
 	}
 
 	public CEPMigrationTest(MigrationVersion migrateVersion) {

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 98cb468..5a98445 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -462,9 +462,9 @@ public class CEPOperatorTest extends TestLogger {
 		try {
 			harness.open();
 
-			final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaOperatorState");
+			final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaValueState");
 			final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
-			Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy);
+			Whitebox.setInternalState(operator, "nfaValueState", nfaOperatorStateSpy);
 
 			Event startEvent = new Event(42, "c", 1.0);
 			SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
@@ -507,9 +507,9 @@ public class CEPOperatorTest extends TestLogger {
 
 			harness.open();
 
-			final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaOperatorState");
+			final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaValueState");
 			final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
-			Whitebox.setInternalState(operator, "nfaOperatorState", nfaOperatorStateSpy);
+			Whitebox.setInternalState(operator, "nfaValueState", nfaOperatorStateSpy);
 
 			Event startEvent = new Event(42, "c", 1.0);
 			SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
@@ -568,8 +568,8 @@ public class CEPOperatorTest extends TestLogger {
 			assertEquals(2L, harness.numEventTimeTimers());
 			assertEquals(4L, operator.getPQSize(42));
 			assertEquals(1L, operator.getPQSize(43));
-			assertTrue(!operator.hasNonEmptyNFA(42));
-			assertTrue(!operator.hasNonEmptyNFA(43));
+			assertTrue(!operator.hasNonEmptyNFAState(42));
+			assertTrue(!operator.hasNonEmptyNFAState(43));
 
 			harness.processWatermark(new Watermark(2L));
 
@@ -581,9 +581,9 @@ public class CEPOperatorTest extends TestLogger {
 			// for 43 the element entered the NFA and the PQ is empty
 
 			assertEquals(2L, harness.numEventTimeTimers());
-			assertTrue(operator.hasNonEmptyNFA(42));
+			assertTrue(operator.hasNonEmptyNFAState(42));
 			assertEquals(1L, operator.getPQSize(42));
-			assertTrue(operator.hasNonEmptyNFA(43));
+			assertTrue(operator.hasNonEmptyNFAState(43));
 			assertTrue(!operator.hasNonEmptyPQ(43));
 
 			harness.processElement(new StreamRecord<>(startEvent2, 4L));
@@ -605,9 +605,9 @@ public class CEPOperatorTest extends TestLogger {
 			// now we have 1 key because the 43 expired and was removed.
 			// 42 is still there due to startEvent2
 			assertEquals(1L, harness.numEventTimeTimers());
-			assertTrue(operator2.hasNonEmptyNFA(42));
+			assertTrue(operator2.hasNonEmptyNFAState(42));
 			assertTrue(!operator2.hasNonEmptyPQ(42));
-			assertTrue(!operator2.hasNonEmptyNFA(43));
+			assertTrue(!operator2.hasNonEmptyNFAState(43));
 			assertTrue(!operator2.hasNonEmptyPQ(43));
 
 			verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
@@ -622,7 +622,7 @@ public class CEPOperatorTest extends TestLogger {
 			harness.processWatermark(20L);
 			harness.processWatermark(21L);
 
-			assertTrue(!operator2.hasNonEmptyNFA(42));
+			assertTrue(!operator2.hasNonEmptyNFAState(42));
 			assertTrue(!operator2.hasNonEmptyPQ(42));
 			assertEquals(0L, harness.numEventTimeTimers());
 
@@ -665,7 +665,7 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertEquals(1L, harness.numEventTimeTimers());
 			assertEquals(7L, operator.getPQSize(41));
-			assertTrue(!operator.hasNonEmptyNFA(41));
+			assertTrue(!operator.hasNonEmptyNFAState(41));
 
 			harness.processWatermark(new Watermark(2L));
 
@@ -674,7 +674,7 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertEquals(1L, harness.numEventTimeTimers());
 			assertEquals(6L, operator.getPQSize(41));
-			assertTrue(operator.hasNonEmptyNFA(41)); // processed the first element
+			assertTrue(operator.hasNonEmptyNFAState(41)); // processed the first element
 
 			harness.processWatermark(new Watermark(8L));
 
@@ -714,12 +714,12 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertEquals(1L, harness.numEventTimeTimers());
 			assertEquals(0L, operator.getPQSize(41));
-			assertTrue(operator.hasNonEmptyNFA(41));
+			assertTrue(operator.hasNonEmptyNFAState(41));
 
 			harness.processWatermark(new Watermark(17L));
 			verifyWatermark(harness.getOutput().poll(), 17L);
 
-			assertTrue(!operator.hasNonEmptyNFA(41));
+			assertTrue(!operator.hasNonEmptyNFAState(41));
 			assertTrue(!operator.hasNonEmptyPQ(41));
 			assertEquals(0L, harness.numEventTimeTimers());
 		} finally {
@@ -800,8 +800,8 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertTrue(!operator.hasNonEmptyPQ(42));
 			assertTrue(!operator.hasNonEmptyPQ(43));
-			assertTrue(operator.hasNonEmptyNFA(42));
-			assertTrue(operator.hasNonEmptyNFA(43));
+			assertTrue(operator.hasNonEmptyNFAState(42));
+			assertTrue(operator.hasNonEmptyNFAState(43));
 
 			harness.setProcessingTime(3L);
 
@@ -834,10 +834,10 @@ public class CEPOperatorTest extends TestLogger {
 
 			harness.setProcessingTime(21L);
 
-			assertTrue(operator2.hasNonEmptyNFA(42));
+			assertTrue(operator2.hasNonEmptyNFAState(42));
 
 			harness.processElement(new StreamRecord<>(startEvent1, 21L));
-			assertTrue(operator2.hasNonEmptyNFA(42));
+			assertTrue(operator2.hasNonEmptyNFAState(42));
 
 			harness.setProcessingTime(49L);
 
@@ -845,7 +845,7 @@ public class CEPOperatorTest extends TestLogger {
 			harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
 
 			// the pattern expired
-			assertTrue(!operator2.hasNonEmptyNFA(42));
+			assertTrue(!operator2.hasNonEmptyNFAState(42));
 
 			assertEquals(0L, harness.numEventTimeTimers());
 			assertTrue(!operator2.hasNonEmptyPQ(42));
@@ -988,12 +988,12 @@ public class CEPOperatorTest extends TestLogger {
 			harness
 				.processElement(new StreamRecord<>(new SubEvent(42, "barfoo", 1.0, 5.0), 0L));
 
-			assertTrue(!operator.hasNonEmptyNFA(42));
-			assertTrue(!operator.hasNonEmptyNFA(43));
+			assertTrue(!operator.hasNonEmptyNFAState(42));
+			assertTrue(!operator.hasNonEmptyNFAState(43));
 
 			harness.setProcessingTime(3L);
-			assertTrue(operator.hasNonEmptyNFA(42));
-			assertTrue(operator.hasNonEmptyNFA(43));
+			assertTrue(operator.hasNonEmptyNFAState(42));
+			assertTrue(operator.hasNonEmptyNFAState(43));
 
 			harness.processElement(new StreamRecord<>(middleEvent2, 3L));
 			harness.processElement(new StreamRecord<>(middleEvent1, 3L));
@@ -1047,14 +1047,14 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertTrue(operator.hasNonEmptyPQ(42));
 			assertTrue(operator.hasNonEmptyPQ(43));
-			assertTrue(!operator.hasNonEmptyNFA(42));
-			assertTrue(!operator.hasNonEmptyNFA(43));
+			assertTrue(!operator.hasNonEmptyNFAState(42));
+			assertTrue(!operator.hasNonEmptyNFAState(43));
 
 			harness.processWatermark(3L);
 			assertTrue(!operator.hasNonEmptyPQ(42));
 			assertTrue(!operator.hasNonEmptyPQ(43));
-			assertTrue(operator.hasNonEmptyNFA(42));
-			assertTrue(operator.hasNonEmptyNFA(43));
+			assertTrue(operator.hasNonEmptyNFAState(42));
+			assertTrue(operator.hasNonEmptyNFAState(43));
 
 			harness.processElement(new StreamRecord<>(startEvent2, 4L));
 			harness.processElement(new StreamRecord<Event>(middleEvent2, 5L));

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.4-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.4-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.4-snapshot
deleted file mode 100644
index 34c5110..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.4-snapshot and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.5-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.5-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.5-snapshot
new file mode 100644
index 0000000..73934e5
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-after-branching-flink1.5-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.4-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.4-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.4-snapshot
deleted file mode 100644
index d4d3405..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.4-snapshot and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.5-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.5-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.5-snapshot
new file mode 100644
index 0000000..3126ff4
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-conditions-flink1.5-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.4-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.4-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.4-snapshot
deleted file mode 100644
index c87f307..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.4-snapshot and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.5-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.5-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.5-snapshot
new file mode 100644
index 0000000..63b1b49
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-single-pattern-afterwards-flink1.5-snapshot differ

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.4-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.4-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.4-snapshot
deleted file mode 100644
index 660a21a..0000000
Binary files a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.4-snapshot and /dev/null differ

http://git-wip-us.apache.org/repos/asf/flink/blob/55cd059a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.5-snapshot
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.5-snapshot b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.5-snapshot
new file mode 100644
index 0000000..1b1fa75
Binary files /dev/null and b/flink-libraries/flink-cep/src/test/resources/cep-migration-starting-new-pattern-flink1.5-snapshot differ