You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:21 UTC

[04/10] flink git commit: [FLINK-9418] Migrate SharedBuffer to use MapState

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index c94d739..88504e9 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
@@ -36,7 +35,8 @@ import java.util.List;
 
 import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
 import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
-import static org.junit.Assert.assertTrue;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for handling Events that are equal in case of {@link Object#equals(Object)} and have same timestamps.
@@ -45,7 +45,7 @@ import static org.junit.Assert.assertTrue;
 public class SameElementITCase extends TestLogger {
 
 	@Test
-	public void testEagerZeroOrMoreSameElement() {
+	public void testEagerZeroOrMoreSameElement() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -87,7 +87,7 @@ public class SameElementITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -138,15 +138,16 @@ public void testClearingBuffer() throws Exception {
 		}
 	});
 
-	NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+	NFA<Event> nfa = compile(pattern, false);
 
-	NFAState<Event> nfaState = nfa.createNFAState();
+	NFAState nfaState = nfa.createInitialNFAState();
 
 	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 		Lists.newArrayList(a1, b1, c1, d)
 	));
-	assertTrue(nfaState.isEmpty());
+	assertEquals(1, nfaState.getComputationStates().size());
+	assertEquals("a", nfaState.getComputationStates().peek().getCurrentStateName());
 }
 
 @Test
@@ -182,9 +183,9 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 		}
 	});
 
-	NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+	NFA<Event> nfa = compile(pattern, false);
 
-	NFAState<Event> nfaState = nfa.createNFAState();
+	NFAState nfaState = nfa.createInitialNFAState();
 
 	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
@@ -192,11 +193,12 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 		Lists.newArrayList(a1, d1, d2),
 		Lists.newArrayList(a1, d1)
 	));
-	assertTrue(nfaState.isEmpty());
+	assertEquals(1, nfaState.getComputationStates().size());
+	assertEquals("a", nfaState.getComputationStates().peek().getCurrentStateName());
 }
 
 	@Test
-	public void testZeroOrMoreSameElement() {
+	public void testZeroOrMoreSameElement() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -239,7 +241,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -320,7 +322,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -375,7 +377,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -418,7 +420,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 			}
 		}).oneOrMore().optional();
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -481,7 +483,7 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
deleted file mode 100644
index 566e2b9..0000000
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.nfa;
-
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.cep.Event;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.flink.util.TestLogger;
-
-import org.junit.Test;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Tests for {@link SharedBuffer}.
- */
-public class SharedBufferTest extends TestLogger {
-
-	@Test
-	public void testSharedBuffer() {
-		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>();
-		int numberEvents = 8;
-		Event[] events = new Event[numberEvents];
-		final long timestamp = 1L;
-
-		for (int i = 0; i < numberEvents; i++) {
-			events[i] = new Event(i + 1, "e" + (i + 1), i);
-		}
-
-		Map<String, List<Event>> expectedPattern1 = new HashMap<>();
-		expectedPattern1.put("a1", new ArrayList<Event>());
-		expectedPattern1.get("a1").add(events[2]);
-
-		expectedPattern1.put("a[]", new ArrayList<Event>());
-		expectedPattern1.get("a[]").add(events[3]);
-
-		expectedPattern1.put("b", new ArrayList<Event>());
-		expectedPattern1.get("b").add(events[5]);
-
-		Map<String, List<Event>> expectedPattern2 = new HashMap<>();
-		expectedPattern2.put("a1", new ArrayList<Event>());
-		expectedPattern2.get("a1").add(events[0]);
-
-		expectedPattern2.put("a[]", new ArrayList<Event>());
-		expectedPattern2.get("a[]").add(events[1]);
-		expectedPattern2.get("a[]").add(events[2]);
-		expectedPattern2.get("a[]").add(events[3]);
-		expectedPattern2.get("a[]").add(events[4]);
-
-		expectedPattern2.put("b", new ArrayList<Event>());
-		expectedPattern2.get("b").add(events[5]);
-
-		Map<String, List<Event>> expectedPattern3 = new HashMap<>();
-		expectedPattern3.put("a1", new ArrayList<Event>());
-		expectedPattern3.get("a1").add(events[0]);
-
-		expectedPattern3.put("a[]", new ArrayList<Event>());
-		expectedPattern3.get("a[]").add(events[1]);
-		expectedPattern3.get("a[]").add(events[2]);
-		expectedPattern3.get("a[]").add(events[3]);
-		expectedPattern3.get("a[]").add(events[4]);
-		expectedPattern3.get("a[]").add(events[5]);
-		expectedPattern3.get("a[]").add(events[6]);
-
-		expectedPattern3.put("b", new ArrayList<Event>());
-		expectedPattern3.get("b").add(events[7]);
-
-		sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1"));
-		sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("a1", events[2], timestamp, null, null, 0, 0, DeweyNumber.fromString("2"));
-		sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, 1, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, 2, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], timestamp, 0, DeweyNumber.fromString("2.0"));
-		sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, 3, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.0.0"));
-		sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.1"));
-		sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], timestamp, 1, DeweyNumber.fromString("2.0.0"));
-		sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
-		sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0"));
-
-		Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
-		sharedBuffer.release("b", events[7], timestamp, 7);
-		Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
-
-		Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, 2, DeweyNumber.fromString("2.0.0"));
-		Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, 5, DeweyNumber.fromString("1.0.0"));
-		sharedBuffer.release("b", events[5], timestamp, 2);
-		sharedBuffer.release("b", events[5], timestamp, 5);
-
-		assertEquals(1L, patterns3.size());
-		assertEquals(0L, patterns4.size());
-		assertEquals(1L, patterns1.size());
-		assertEquals(1L, patterns2.size());
-
-		assertTrue(sharedBuffer.isEmpty());
-		assertTrue(patterns4.isEmpty());
-		assertEquals(Collections.singletonList(expectedPattern1), patterns1);
-		assertEquals(Collections.singletonList(expectedPattern2), patterns2);
-		assertEquals(Collections.singletonList(expectedPattern3), patterns3);
-	}
-
-	@Test
-	public void testSharedBufferSerialization() throws IOException, ClassNotFoundException {
-		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>();
-		int numberEvents = 8;
-		Event[] events = new Event[numberEvents];
-		final long timestamp = 1L;
-
-		for (int i = 0; i < numberEvents; i++) {
-			events[i] = new Event(i + 1, "e" + (i + 1), i);
-		}
-
-		sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1"));
-		sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("a1", events[2], timestamp, null, null, 0, 0, DeweyNumber.fromString("2"));
-		sharedBuffer.put("a[]", events[2], timestamp, "a[]", events[1], timestamp, 1, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("a[]", events[3], timestamp, "a[]", events[2], timestamp, 2, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("a[]", events[3], timestamp, "a1", events[2], timestamp, 0, DeweyNumber.fromString("2.0"));
-		sharedBuffer.put("a[]", events[4], timestamp, "a[]", events[3], timestamp, 3, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("b", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.0.0"));
-		sharedBuffer.put("a[]", events[5], timestamp, "a[]", events[4], timestamp, 4, DeweyNumber.fromString("1.1"));
-		sharedBuffer.put("b", events[5], timestamp, "a[]", events[3], timestamp, 1, DeweyNumber.fromString("2.0.0"));
-		sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
-		sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0"));
-
-		SharedBuffer.SharedBufferSerializer serializer = new SharedBuffer.SharedBufferSerializer(
-				StringSerializer.INSTANCE, Event.createTypeSerializer());
-
-		ByteArrayOutputStream baos = new ByteArrayOutputStream();
-		serializer.serialize(sharedBuffer, new DataOutputViewStreamWrapper(baos));
-
-		ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
-		SharedBuffer<String, Event> copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
-
-		assertEquals(sharedBuffer, copy);
-	}
-
-	@Test
-	public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() {
-		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>();
-		int numberEvents = 8;
-		Event[] events = new Event[numberEvents];
-		final long timestamp = 1L;
-
-		for (int i = 0; i < numberEvents; i++) {
-			events[i] = new Event(i + 1, "e" + (i + 1), i);
-		}
-
-		sharedBuffer.put("start", events[1], timestamp, DeweyNumber.fromString("1"));
-		sharedBuffer.put("branching", events[2], timestamp, "start", events[1], timestamp, 0, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("branching", events[3], timestamp, "start", events[1], timestamp, 0, DeweyNumber.fromString("1.1"));
-		sharedBuffer.put("branching", events[3], timestamp, "branching", events[2], timestamp, 1, DeweyNumber.fromString("1.0.0"));
-		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, 2, DeweyNumber.fromString("1.0.0.0"));
-		sharedBuffer.put("branching", events[4], timestamp, "branching", events[3], timestamp, 2, DeweyNumber.fromString("1.1.0"));
-
-		//simulate IGNORE (next event can point to events[2])
-		sharedBuffer.lock("branching", events[2], timestamp, 1);
-
-		sharedBuffer.release("branching", events[4], timestamp, 3);
-
-		//There should be still events[1] and events[2] in the buffer
-		assertFalse(sharedBuffer.isEmpty());
-	}
-
-	@Test
-	public void testSharedBufferExtractOrder() {
-		SharedBuffer<String, Event> sharedBuffer = new SharedBuffer<>();
-		int numberEvents = 10;
-		Event[] events = new Event[numberEvents];
-		final long timestamp = 1L;
-
-		for (int i = 0; i < numberEvents; i++) {
-			events[i] = new Event(i + 1, "e" + (i + 1), i);
-		}
-
-		Map<String, List<Event>> expectedResult = new LinkedHashMap<>();
-		expectedResult.put("a", new ArrayList<>());
-		expectedResult.get("a").add(events[1]);
-		expectedResult.put("b", new ArrayList<>());
-		expectedResult.get("b").add(events[2]);
-		expectedResult.put("aa", new ArrayList<>());
-		expectedResult.get("aa").add(events[3]);
-		expectedResult.put("bb", new ArrayList<>());
-		expectedResult.get("bb").add(events[4]);
-		expectedResult.put("c", new ArrayList<>());
-		expectedResult.get("c").add(events[5]);
-
-		sharedBuffer.put("a", events[1], timestamp, DeweyNumber.fromString("1"));
-		sharedBuffer.put("b", events[2], timestamp, "a", events[1], timestamp, 0, DeweyNumber.fromString("1.0"));
-		sharedBuffer.put("aa", events[3], timestamp, "b", events[2], timestamp, 1, DeweyNumber.fromString("1.0.0"));
-		sharedBuffer.put("bb", events[4], timestamp, "aa", events[3], timestamp, 2, DeweyNumber.fromString("1.0.0.0"));
-		sharedBuffer.put("c", events[5], timestamp, "bb", events[4], timestamp, 3, DeweyNumber.fromString("1.0.0.0.0"));
-
-		Collection<Map<String, List<Event>>> patternsResult = sharedBuffer.extractPatterns("c", events[5], timestamp, 4, DeweyNumber.fromString("1.0.0.0.0"));
-
-		List<String> expectedOrder = new ArrayList<>();
-		expectedOrder.add("a");
-		expectedOrder.add("b");
-		expectedOrder.add("aa");
-		expectedOrder.add("bb");
-		expectedOrder.add("c");
-
-		List<String> resultOrder = new ArrayList<>();
-		for (String key: patternsResult.iterator().next().keySet()){
-			resultOrder.add(key);
-		}
-		assertEquals(expectedOrder, resultOrder);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
index 4e540dd..049c84b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,13 +33,14 @@ import java.util.List;
 
 import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
 import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
  * Tests for {@link Pattern#timesOrMore(int)}.
  */
 public class TimesOrMoreITCase extends TestLogger {
 	@Test
-	public void testTimesOrMore() {
+	public void testTimesOrMore() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -79,7 +79,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -91,7 +91,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreNonStrict() {
+	public void testTimesOrMoreNonStrict() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -126,7 +126,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -139,7 +139,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreStrict() {
+	public void testTimesOrMoreStrict() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -174,7 +174,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -184,7 +184,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreStrictOptional() {
+	public void testTimesOrMoreStrictOptional() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -219,7 +219,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -230,7 +230,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreStrictOptional2() {
+	public void testTimesOrMoreStrictOptional2() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -263,7 +263,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -275,7 +275,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreNonStrictOptional() {
+	public void testTimesOrMoreNonStrictOptional() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -306,7 +306,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -316,7 +316,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreNonStrictOptional2() {
+	public void testTimesOrMoreNonStrictOptional2() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -351,7 +351,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -365,7 +365,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreNonStrictOptional3() {
+	public void testTimesOrMoreNonStrictOptional3() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -400,7 +400,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -413,7 +413,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreNonStrictWithNext() {
+	public void testTimesOrMoreNonStrictWithNext() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -448,7 +448,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -460,7 +460,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreNotStrictWithFollowedBy() {
+	public void testTimesOrMoreNotStrictWithFollowedBy() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -493,7 +493,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -504,7 +504,7 @@ public class TimesOrMoreITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesOrMoreNotStrictWithFollowedByAny() {
+	public void testTimesOrMoreNotStrictWithFollowedByAny() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -537,7 +537,7 @@ public class TimesOrMoreITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 76ed26a..203a1c2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -34,6 +33,7 @@ import java.util.List;
 
 import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
 import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
  * Tests for {@link Pattern#times(int, int)}.
@@ -42,7 +42,7 @@ import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
 public class TimesRangeITCase extends TestLogger {
 
 	@Test
-	public void testTimesRange() {
+	public void testTimesRange() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -80,7 +80,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -93,7 +93,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeFromZero() {
+	public void testTimesRangeFromZero() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		Event startEvent = new Event(40, "c", 1.0);
@@ -131,7 +131,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -144,7 +144,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeNonStrict() {
+	public void testTimesRangeNonStrict() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -178,7 +178,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -194,7 +194,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeStrict() {
+	public void testTimesRangeStrict() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -228,7 +228,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -242,7 +242,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeStrictOptional() {
+	public void testTimesRangeStrictOptional() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -276,7 +276,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -290,7 +290,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeStrictOptional1() {
+	public void testTimesRangeStrictOptional1() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -322,7 +322,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -335,7 +335,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeNonStrictOptional1() {
+	public void testTimesRangeNonStrictOptional1() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -365,7 +365,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -375,7 +375,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeNonStrictOptional2() {
+	public void testTimesRangeNonStrictOptional2() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -409,7 +409,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -423,7 +423,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeNonStrictOptional3() {
+	public void testTimesRangeNonStrictOptional3() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -457,7 +457,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -470,7 +470,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeNonStrictWithNext() {
+	public void testTimesRangeNonStrictWithNext() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -504,7 +504,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -516,7 +516,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeNotStrictWithFollowedBy() {
+	public void testTimesRangeNotStrictWithFollowedBy() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -548,7 +548,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -559,7 +559,7 @@ public class TimesRangeITCase extends TestLogger {
 	}
 
 	@Test
-	public void testTimesRangeNotStrictWithFollowedByAny() {
+	public void testTimesRangeNotStrictWithFollowedByAny() throws Exception {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
 		inputEvents.add(new StreamRecord<>(ConsecutiveData.startEvent, 1));
@@ -591,7 +591,7 @@ public class TimesRangeITCase extends TestLogger {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index b603174..fb7f086 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
@@ -34,7 +33,8 @@ import java.util.List;
 
 import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
 import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
-import static org.junit.Assert.assertTrue;
+import static org.apache.flink.cep.utils.NFAUtils.compile;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for {@link Pattern#until(IterativeCondition)}.
@@ -89,9 +89,9 @@ public class UntilConditionITCase {
 				UNTIL_CONDITION
 			);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -99,7 +99,9 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertTrue(nfaState.isEmpty());
+
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -137,9 +139,9 @@ public class UntilConditionITCase {
 		}).oneOrMore().allowCombinations().until(UNTIL_CONDITION)
 			.followedBy("end").where(UNTIL_CONDITION);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -149,7 +151,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -187,9 +190,9 @@ public class UntilConditionITCase {
 				UNTIL_CONDITION
 			);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -197,7 +200,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -237,16 +241,17 @@ public class UntilConditionITCase {
 				UNTIL_CONDITION
 			);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
 		));
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -284,9 +289,9 @@ public class UntilConditionITCase {
 				UNTIL_CONDITION
 			);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -295,7 +300,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -333,9 +339,9 @@ public class UntilConditionITCase {
 		}).oneOrMore().optional().allowCombinations().until(UNTIL_CONDITION)
 			.followedBy("end").where(UNTIL_CONDITION);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -346,7 +352,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -384,9 +391,9 @@ public class UntilConditionITCase {
 				UNTIL_CONDITION
 			);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -395,7 +402,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1, breaking),
 			Lists.newArrayList(startEvent, breaking)
 		));
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -430,7 +438,7 @@ public class UntilConditionITCase {
 			}
 		}).oneOrMore().until(UNTIL_CONDITION);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -474,7 +482,7 @@ public class UntilConditionITCase {
 			}
 		}).oneOrMore().optional().until(UNTIL_CONDITION);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
@@ -514,9 +522,9 @@ public class UntilConditionITCase {
 			}
 		}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -526,7 +534,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1)
 		));
 
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -567,9 +576,9 @@ public class UntilConditionITCase {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -579,7 +588,8 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent, middleEvent1)
 		));
 
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 
 	@Test
@@ -620,9 +630,9 @@ public class UntilConditionITCase {
 			}
 		});
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+		NFA<Event> nfa = compile(pattern, false);
 
-		NFAState<Event> nfaState = nfa.createNFAState();
+		NFAState nfaState = nfa.createInitialNFAState();
 
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
 
@@ -633,6 +643,7 @@ public class UntilConditionITCase {
 			Lists.newArrayList(startEvent)
 		));
 
-		assertTrue(nfaState.isEmpty());
+		assertEquals(1, nfaState.getComputationStates().size());
+		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index d1b6d59..92f59a5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -18,10 +18,7 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
@@ -46,6 +43,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -72,9 +70,6 @@ public class NFACompilerTest extends TestLogger {
 		}
 	};
 
-	private static final TypeSerializer<Event> serializer = TypeExtractor.createTypeInfo(Event.class)
-		.createSerializer(new ExecutionConfig());
-
 	@Rule
 	public ExpectedException expectedException = ExpectedException.none();
 
@@ -90,7 +85,7 @@ public class NFACompilerTest extends TestLogger {
 			.followedBy("start").where(new TestFilter());
 
 		// here we must have an exception because of the two "start" patterns with the same name.
-		NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+		compile(invalidPattern, false);
 	}
 
 	@Test
@@ -105,7 +100,7 @@ public class NFACompilerTest extends TestLogger {
 			.notFollowedBy("end").where(new TestFilter());
 
 		// here we must have an exception because of the two "start" patterns with the same name.
-		NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+		compile(invalidPattern, false);
 	}
 
 	/**
@@ -131,7 +126,7 @@ public class NFACompilerTest extends TestLogger {
 			.followedBy("middle").subtype(SubEvent.class)
 			.next("end").where(endFilter);
 
-		NFA<Event> nfa = NFACompiler.compile(pattern, serializer, false);
+		NFA<Event> nfa = compile(pattern, false);
 
 		Collection<State<Event>> states = nfa.getStates();
 		assertEquals(4, states.size());
@@ -218,7 +213,7 @@ public class NFACompilerTest extends TestLogger {
 			}
 		});
 
-		NFACompiler.compile(invalidPattern, Event.createTypeSerializer(), false);
+		compile(invalidPattern, false);
 	}
 
 	private <T> Set<Tuple2<String, StateTransitionAction>> unfoldTransitions(final State<T> state) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
new file mode 100644
index 0000000..9be7cc1
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link SharedBuffer}.
+ */
+public class SharedBufferTest extends TestLogger {
+
+	@Test
+	public void testSharedBuffer() throws Exception {
+		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		int numberEvents = 8;
+		Event[] events = new Event[numberEvents];
+		EventId[] eventIds = new EventId[numberEvents];
+		final long timestamp = 1L;
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+			eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
+		}
+
+		Map<String, List<Event>> expectedPattern1 = new HashMap<>();
+		expectedPattern1.put("a1", new ArrayList<>());
+		expectedPattern1.get("a1").add(events[2]);
+
+		expectedPattern1.put("a[]", new ArrayList<>());
+		expectedPattern1.get("a[]").add(events[3]);
+
+		expectedPattern1.put("b", new ArrayList<>());
+		expectedPattern1.get("b").add(events[5]);
+
+		Map<String, List<Event>> expectedPattern2 = new HashMap<>();
+		expectedPattern2.put("a1", new ArrayList<>());
+		expectedPattern2.get("a1").add(events[0]);
+
+		expectedPattern2.put("a[]", new ArrayList<>());
+		expectedPattern2.get("a[]").add(events[1]);
+		expectedPattern2.get("a[]").add(events[2]);
+		expectedPattern2.get("a[]").add(events[3]);
+		expectedPattern2.get("a[]").add(events[4]);
+
+		expectedPattern2.put("b", new ArrayList<>());
+		expectedPattern2.get("b").add(events[5]);
+
+		Map<String, List<Event>> expectedPattern3 = new HashMap<>();
+		expectedPattern3.put("a1", new ArrayList<>());
+		expectedPattern3.get("a1").add(events[0]);
+
+		expectedPattern3.put("a[]", new ArrayList<>());
+		expectedPattern3.get("a[]").add(events[1]);
+		expectedPattern3.get("a[]").add(events[2]);
+		expectedPattern3.get("a[]").add(events[3]);
+		expectedPattern3.get("a[]").add(events[4]);
+		expectedPattern3.get("a[]").add(events[5]);
+		expectedPattern3.get("a[]").add(events[6]);
+
+		expectedPattern3.put("b", new ArrayList<>());
+		expectedPattern3.get("b").add(events[7]);
+
+		NodeId a10 = sharedBuffer.put("a1", eventIds[0], DeweyNumber.fromString("1"));
+		NodeId aLoop0 = sharedBuffer.put("a[]", eventIds[1], a10, DeweyNumber.fromString("1.0"));
+		NodeId a11 = sharedBuffer.put("a1", eventIds[2], DeweyNumber.fromString("2"));
+		NodeId aLoop1 = sharedBuffer.put("a[]", eventIds[2], aLoop0, DeweyNumber.fromString("1.0"));
+		NodeId aLoop2 = sharedBuffer.put("a[]", eventIds[3], aLoop1, DeweyNumber.fromString("1.0"));
+		NodeId aSecondLoop0 = sharedBuffer.put("a[]", eventIds[3], a11, DeweyNumber.fromString("2.0"));
+		NodeId aLoop3 = sharedBuffer.put("a[]", eventIds[4], aLoop2, DeweyNumber.fromString("1.0"));
+		NodeId b0 = sharedBuffer.put("b", eventIds[5], aLoop3, DeweyNumber.fromString("1.0.0"));
+		NodeId aLoop4 = sharedBuffer.put("a[]", eventIds[5], aLoop3, DeweyNumber.fromString("1.1"));
+		NodeId b1 = sharedBuffer.put("b", eventIds[5], aSecondLoop0, DeweyNumber.fromString("2.0.0"));
+		NodeId aLoop5 = sharedBuffer.put("a[]", eventIds[6], aLoop4, DeweyNumber.fromString("1.1"));
+		NodeId b3 = sharedBuffer.put("b", eventIds[7], aLoop5, DeweyNumber.fromString("1.1.0"));
+
+		Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns(b3, DeweyNumber.fromString("1.1.0"));
+		sharedBuffer.releaseNode(b3);
+		Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns(b3, DeweyNumber.fromString("1.1.0"));
+
+		Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns(b1, DeweyNumber.fromString("2.0.0"));
+		Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns(b0, DeweyNumber.fromString("1.0.0"));
+		sharedBuffer.releaseNode(b0);
+		sharedBuffer.releaseNode(b1);
+
+		for (EventId eventId : eventIds) {
+			sharedBuffer.releaseEvent(eventId);
+		}
+
+		assertEquals(1L, patterns3.size());
+		assertEquals(0L, patterns4.size());
+		assertEquals(1L, patterns1.size());
+		assertEquals(1L, patterns2.size());
+
+		assertTrue(sharedBuffer.isEmpty());
+		assertTrue(patterns4.isEmpty());
+		assertEquals(Collections.singletonList(expectedPattern1), patterns1);
+		assertEquals(Collections.singletonList(expectedPattern2), patterns2);
+		assertEquals(Collections.singletonList(expectedPattern3), patterns3);
+	}
+
+	@Test
+	public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() throws Exception {
+		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		int numberEvents = 8;
+		Event[] events = new Event[numberEvents];
+		EventId[] eventIds = new EventId[numberEvents];
+		final long timestamp = 1L;
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+			eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
+		}
+
+		NodeId start = sharedBuffer.put("start", eventIds[1], DeweyNumber.fromString("1"));
+		NodeId b0 = sharedBuffer.put("branching", eventIds[2], start, DeweyNumber.fromString("1.0"));
+		NodeId b1 = sharedBuffer.put("branching", eventIds[3], start, DeweyNumber.fromString("1.1"));
+		NodeId b00 = sharedBuffer.put("branching", eventIds[3], b0, DeweyNumber.fromString("1.0.0"));
+		sharedBuffer.put("branching", eventIds[4], b00, DeweyNumber.fromString("1.0.0.0"));
+		NodeId b10 = sharedBuffer.put("branching", eventIds[4], b1, DeweyNumber.fromString("1.1.0"));
+
+		//simulate IGNORE (next event can point to events[2])
+		sharedBuffer.lockNode(b0);
+
+		sharedBuffer.releaseNode(b10);
+
+		for (EventId eventId : eventIds) {
+			sharedBuffer.releaseEvent(eventId);
+		}
+
+		//There should be still events[1] and events[2] in the buffer
+		assertFalse(sharedBuffer.isEmpty());
+	}
+
+	@Test
+	public void testSharedBufferExtractOrder() throws Exception {
+		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		int numberEvents = 5;
+		Event[] events = new Event[numberEvents];
+		EventId[] eventIds = new EventId[numberEvents];
+		final long timestamp = 1L;
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+			eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
+		}
+
+		Map<String, List<Event>> expectedResult = new LinkedHashMap<>();
+		expectedResult.put("a", new ArrayList<>());
+		expectedResult.get("a").add(events[0]);
+		expectedResult.put("b", new ArrayList<>());
+		expectedResult.get("b").add(events[1]);
+		expectedResult.put("aa", new ArrayList<>());
+		expectedResult.get("aa").add(events[2]);
+		expectedResult.put("bb", new ArrayList<>());
+		expectedResult.get("bb").add(events[3]);
+		expectedResult.put("c", new ArrayList<>());
+		expectedResult.get("c").add(events[4]);
+
+		NodeId a = sharedBuffer.put("a", eventIds[0], DeweyNumber.fromString("1"));
+		NodeId b = sharedBuffer.put("b", eventIds[1], a, DeweyNumber.fromString("1.0"));
+		NodeId aa = sharedBuffer.put("aa", eventIds[2], b, DeweyNumber.fromString("1.0.0"));
+		NodeId bb = sharedBuffer.put("bb", eventIds[3], aa, DeweyNumber.fromString("1.0.0.0"));
+		NodeId c = sharedBuffer.put("c", eventIds[4], bb, DeweyNumber.fromString("1.0.0.0.0"));
+
+		Collection<Map<String, List<Event>>> patternsResult = sharedBuffer.extractPatterns(c,
+			DeweyNumber.fromString("1.0.0.0.0"));
+
+		List<String> expectedOrder = new ArrayList<>();
+		expectedOrder.add("a");
+		expectedOrder.add("b");
+		expectedOrder.add("aa");
+		expectedOrder.add("bb");
+		expectedOrder.add("c");
+
+		for (EventId eventId : eventIds) {
+			sharedBuffer.releaseEvent(eventId);
+		}
+
+		List<String> resultOrder = new ArrayList<>(patternsResult.iterator().next().keySet());
+		assertEquals(expectedOrder, resultOrder);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
index 9b120ca..6c7543a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigrationTest.java
@@ -551,7 +551,7 @@ public class CEPMigrationTest {
 	}
 
 	@Test
-	public void testAndOrSubtypConditionsAfterMigration() throws Exception {
+	public void testAndOrSubtypeConditionsAfterMigration() throws Exception {
 
 		KeySelector<Event, Integer> keySelector = new KeySelector<Event, Integer>() {
 			private static final long serialVersionUID = -4873366487571254798L;
@@ -625,7 +625,7 @@ public class CEPMigrationTest {
 			Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new StartFilter())
 					.within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
 
@@ -653,7 +653,7 @@ public class CEPMigrationTest {
 				.times(2)
 				.within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
 
@@ -684,7 +684,7 @@ public class CEPMigrationTest {
 					// priority queue in CEP operator are correctly checkpointed/restored
 					.within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 5a98445..4786484 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -64,6 +64,7 @@ import java.util.Map;
 import java.util.Queue;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.validateMockitoUsage;
 
@@ -462,9 +463,9 @@ public class CEPOperatorTest extends TestLogger {
 		try {
 			harness.open();
 
-			final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaValueState");
+			final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "computationStates");
 			final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
-			Whitebox.setInternalState(operator, "nfaValueState", nfaOperatorStateSpy);
+			Whitebox.setInternalState(operator, "computationStates", nfaOperatorStateSpy);
 
 			Event startEvent = new Event(42, "c", 1.0);
 			SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
@@ -476,7 +477,7 @@ public class CEPOperatorTest extends TestLogger {
 			harness.processElement(new StreamRecord<>(endEvent, 4L));
 
 			// verify the number of invocations NFA is updated
-			Mockito.verify(nfaOperatorStateSpy, Mockito.times(2)).update(Mockito.any());
+			Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any());
 
 			// get and verify the output
 			Queue<Object> result = harness.getOutput();
@@ -507,9 +508,9 @@ public class CEPOperatorTest extends TestLogger {
 
 			harness.open();
 
-			final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "nfaValueState");
+			final ValueState nfaOperatorState = (ValueState) Whitebox.<ValueState>getInternalState(operator, "computationStates");
 			final ValueState nfaOperatorStateSpy = Mockito.spy(nfaOperatorState);
-			Whitebox.setInternalState(operator, "nfaValueState", nfaOperatorStateSpy);
+			Whitebox.setInternalState(operator, "computationStates", nfaOperatorStateSpy);
 
 			Event startEvent = new Event(42, "c", 1.0);
 			SubEvent middleEvent = new SubEvent(42, "a", 1.0, 10.0);
@@ -521,7 +522,7 @@ public class CEPOperatorTest extends TestLogger {
 			harness.processElement(new StreamRecord<>(endEvent, 4L));
 
 			// verify the number of invocations NFA is updated
-			Mockito.verify(nfaOperatorStateSpy, Mockito.times(2)).update(Mockito.any());
+			Mockito.verify(nfaOperatorStateSpy, Mockito.times(3)).update(Mockito.any());
 
 			// get and verify the output
 			Queue<Object> result = harness.getOutput();
@@ -568,8 +569,8 @@ public class CEPOperatorTest extends TestLogger {
 			assertEquals(2L, harness.numEventTimeTimers());
 			assertEquals(4L, operator.getPQSize(42));
 			assertEquals(1L, operator.getPQSize(43));
-			assertTrue(!operator.hasNonEmptyNFAState(42));
-			assertTrue(!operator.hasNonEmptyNFAState(43));
+			assertTrue(!operator.hasNonEmptySharedBuffer(42));
+			assertTrue(!operator.hasNonEmptySharedBuffer(43));
 
 			harness.processWatermark(new Watermark(2L));
 
@@ -581,9 +582,9 @@ public class CEPOperatorTest extends TestLogger {
 			// for 43 the element entered the NFA and the PQ is empty
 
 			assertEquals(2L, harness.numEventTimeTimers());
-			assertTrue(operator.hasNonEmptyNFAState(42));
+			assertTrue(operator.hasNonEmptySharedBuffer(42));
 			assertEquals(1L, operator.getPQSize(42));
-			assertTrue(operator.hasNonEmptyNFAState(43));
+			assertTrue(operator.hasNonEmptySharedBuffer(43));
 			assertTrue(!operator.hasNonEmptyPQ(43));
 
 			harness.processElement(new StreamRecord<>(startEvent2, 4L));
@@ -605,9 +606,9 @@ public class CEPOperatorTest extends TestLogger {
 			// now we have 1 key because the 43 expired and was removed.
 			// 42 is still there due to startEvent2
 			assertEquals(1L, harness.numEventTimeTimers());
-			assertTrue(operator2.hasNonEmptyNFAState(42));
+			assertTrue(operator2.hasNonEmptySharedBuffer(42));
 			assertTrue(!operator2.hasNonEmptyPQ(42));
-			assertTrue(!operator2.hasNonEmptyNFAState(43));
+			assertTrue(!operator2.hasNonEmptySharedBuffer(43));
 			assertTrue(!operator2.hasNonEmptyPQ(43));
 
 			verifyPattern(harness.getOutput().poll(), startEvent1, middleEvent1, endEvent1);
@@ -622,7 +623,7 @@ public class CEPOperatorTest extends TestLogger {
 			harness.processWatermark(20L);
 			harness.processWatermark(21L);
 
-			assertTrue(!operator2.hasNonEmptyNFAState(42));
+			assertTrue(!operator2.hasNonEmptySharedBuffer(42));
 			assertTrue(!operator2.hasNonEmptyPQ(42));
 			assertEquals(0L, harness.numEventTimeTimers());
 
@@ -665,7 +666,7 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertEquals(1L, harness.numEventTimeTimers());
 			assertEquals(7L, operator.getPQSize(41));
-			assertTrue(!operator.hasNonEmptyNFAState(41));
+			assertTrue(!operator.hasNonEmptySharedBuffer(41));
 
 			harness.processWatermark(new Watermark(2L));
 
@@ -674,7 +675,7 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertEquals(1L, harness.numEventTimeTimers());
 			assertEquals(6L, operator.getPQSize(41));
-			assertTrue(operator.hasNonEmptyNFAState(41)); // processed the first element
+			assertTrue(operator.hasNonEmptySharedBuffer(41)); // processed the first element
 
 			harness.processWatermark(new Watermark(8L));
 
@@ -714,12 +715,12 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertEquals(1L, harness.numEventTimeTimers());
 			assertEquals(0L, operator.getPQSize(41));
-			assertTrue(operator.hasNonEmptyNFAState(41));
+			assertTrue(operator.hasNonEmptySharedBuffer(41));
 
 			harness.processWatermark(new Watermark(17L));
 			verifyWatermark(harness.getOutput().poll(), 17L);
 
-			assertTrue(!operator.hasNonEmptyNFAState(41));
+			assertTrue(!operator.hasNonEmptySharedBuffer(41));
 			assertTrue(!operator.hasNonEmptyPQ(41));
 			assertEquals(0L, harness.numEventTimeTimers());
 		} finally {
@@ -800,8 +801,8 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertTrue(!operator.hasNonEmptyPQ(42));
 			assertTrue(!operator.hasNonEmptyPQ(43));
-			assertTrue(operator.hasNonEmptyNFAState(42));
-			assertTrue(operator.hasNonEmptyNFAState(43));
+			assertTrue(operator.hasNonEmptySharedBuffer(42));
+			assertTrue(operator.hasNonEmptySharedBuffer(43));
 
 			harness.setProcessingTime(3L);
 
@@ -834,10 +835,10 @@ public class CEPOperatorTest extends TestLogger {
 
 			harness.setProcessingTime(21L);
 
-			assertTrue(operator2.hasNonEmptyNFAState(42));
+			assertTrue(operator2.hasNonEmptySharedBuffer(42));
 
 			harness.processElement(new StreamRecord<>(startEvent1, 21L));
-			assertTrue(operator2.hasNonEmptyNFAState(42));
+			assertTrue(operator2.hasNonEmptySharedBuffer(42));
 
 			harness.setProcessingTime(49L);
 
@@ -845,7 +846,7 @@ public class CEPOperatorTest extends TestLogger {
 			harness.processElement(new StreamRecord<>(new Event(42, "foobar", 1.0), 2L));
 
 			// the pattern expired
-			assertTrue(!operator2.hasNonEmptyNFAState(42));
+			assertTrue(!operator2.hasNonEmptySharedBuffer(42));
 
 			assertEquals(0L, harness.numEventTimeTimers());
 			assertTrue(!operator2.hasNonEmptyPQ(42));
@@ -910,7 +911,7 @@ public class CEPOperatorTest extends TestLogger {
 
 				@Override
 				public NFA<Event> createNFA() {
-					return NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+					return NFACompiler.compileFactory(pattern, false).createNFA();
 				}
 			});
 
@@ -988,12 +989,12 @@ public class CEPOperatorTest extends TestLogger {
 			harness
 				.processElement(new StreamRecord<>(new SubEvent(42, "barfoo", 1.0, 5.0), 0L));
 
-			assertTrue(!operator.hasNonEmptyNFAState(42));
-			assertTrue(!operator.hasNonEmptyNFAState(43));
+			assertTrue(!operator.hasNonEmptySharedBuffer(42));
+			assertTrue(!operator.hasNonEmptySharedBuffer(43));
 
 			harness.setProcessingTime(3L);
-			assertTrue(operator.hasNonEmptyNFAState(42));
-			assertTrue(operator.hasNonEmptyNFAState(43));
+			assertTrue(operator.hasNonEmptySharedBuffer(42));
+			assertTrue(operator.hasNonEmptySharedBuffer(43));
 
 			harness.processElement(new StreamRecord<>(middleEvent2, 3L));
 			harness.processElement(new StreamRecord<>(middleEvent1, 3L));
@@ -1047,14 +1048,14 @@ public class CEPOperatorTest extends TestLogger {
 
 			assertTrue(operator.hasNonEmptyPQ(42));
 			assertTrue(operator.hasNonEmptyPQ(43));
-			assertTrue(!operator.hasNonEmptyNFAState(42));
-			assertTrue(!operator.hasNonEmptyNFAState(43));
+			assertFalse(operator.hasNonEmptySharedBuffer(42));
+			assertFalse(operator.hasNonEmptySharedBuffer(43));
 
 			harness.processWatermark(3L);
-			assertTrue(!operator.hasNonEmptyPQ(42));
-			assertTrue(!operator.hasNonEmptyPQ(43));
-			assertTrue(operator.hasNonEmptyNFAState(42));
-			assertTrue(operator.hasNonEmptyNFAState(43));
+			assertFalse(operator.hasNonEmptyPQ(42));
+			assertFalse(operator.hasNonEmptyPQ(43));
+			assertTrue(operator.hasNonEmptySharedBuffer(42));
+			assertTrue(operator.hasNonEmptySharedBuffer(43));
 
 			harness.processElement(new StreamRecord<>(startEvent2, 4L));
 			harness.processElement(new StreamRecord<Event>(middleEvent2, 5L));
@@ -1224,7 +1225,7 @@ public class CEPOperatorTest extends TestLogger {
 					// priority queue in CEP operator are correctly checkpointed/restored
 					.within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
 
@@ -1275,7 +1276,7 @@ public class CEPOperatorTest extends TestLogger {
 				}
 			}).within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
 
@@ -1319,7 +1320,7 @@ public class CEPOperatorTest extends TestLogger {
 				}
 			}).within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 8a73556..c4b147c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -26,8 +26,10 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -366,20 +368,23 @@ public class CEPRescalingTest {
 	}
 
 	private KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> getTestHarness(
-		int maxParallelism,
-		int taskParallelism,
-		int subtaskIdx) throws Exception {
+			int maxParallelism,
+			int taskParallelism,
+			int subtaskIdx) throws Exception {
 
 		KeySelector<Event, Integer> keySelector = new TestKeySelector();
-		return new KeyedOneInputStreamOperatorTestHarness<>(
-			getKeyedCepOpearator(
-				false,
-				new NFAFactory()),
-			keySelector,
-			BasicTypeInfo.INT_TYPE_INFO,
-			maxParallelism,
-			taskParallelism,
-			subtaskIdx);
+		KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
+				new KeyedOneInputStreamOperatorTestHarness<>(
+						getKeyedCepOpearator(
+								false,
+								new NFAFactory()),
+						keySelector,
+						BasicTypeInfo.INT_TYPE_INFO,
+						maxParallelism,
+						taskParallelism,
+						subtaskIdx);
+		harness.setStateBackend(new RocksDBStateBackend(new MemoryStateBackend()));
+		return harness;
 	}
 
 	private static class NFAFactory implements NFACompiler.NFAFactory<Event> {
@@ -427,7 +432,7 @@ public class CEPRescalingTest {
 				// priority queue in CEP operator are correctly checkpointed/restored
 				.within(Time.milliseconds(10L));
 
-			return NFACompiler.compile(pattern, Event.createTypeSerializer(), handleTimeout);
+			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
index 3753f34..abc4b18 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
@@ -84,6 +84,8 @@ public class CepOperatorTestUtilities {
 			comparator,
 			null,
 			new PatternSelectFunction<Event, Map<String, List<Event>>>() {
+				private static final long serialVersionUID = -7143807777582726991L;
+
 				@Override
 				public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception {
 					return pattern;

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFAUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFAUtils.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFAUtils.java
new file mode 100644
index 0000000..f2c6613
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFAUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+
+import static org.apache.flink.cep.nfa.compiler.NFACompiler.compileFactory;
+
+/**
+ * Utility methods for constructing NFA.
+ */
+public class NFAUtils {
+
+	/**
+	 * Compiles the given pattern into a {@link NFA}.
+	 *
+	 * @param pattern         Definition of sequence pattern
+	 * @param timeoutHandling True if the NFA shall return timed out event patterns
+	 * @param <T>             Type of the input events
+	 * @return Non-deterministic finite automaton representing the given pattern
+	 */
+	public static <T> NFA<T> compile(Pattern<T, ?> pattern, boolean timeoutHandling) {
+		NFACompiler.NFAFactory<T> factory = compileFactory(pattern, timeoutHandling);
+		return factory.createNFA();
+	}
+
+	private NFAUtils() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
new file mode 100644
index 0000000..2c7b979
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.api.common.state.AggregatingState;
+import org.apache.flink.api.common.state.AggregatingStateDescriptor;
+import org.apache.flink.api.common.state.FoldingState;
+import org.apache.flink.api.common.state.FoldingStateDescriptor;
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Extends {@link SharedBuffer} with methods for checking the number of state accesses. It does not use a proper
+ * StateBackend, but uses stubs over java collections.
+ */
+public class TestSharedBuffer<V> extends SharedBuffer<V> {
+
+	private final MockKeyedStateStore keyedStateStore;
+
+	private TestSharedBuffer(MockKeyedStateStore stateStore, TypeSerializer<V> valueSerializer) {
+		super(stateStore, valueSerializer);
+		this.keyedStateStore = stateStore;
+	}
+
+	public long getStateWrites() {
+		return keyedStateStore.stateWrites;
+	}
+
+	public long getStateReads() {
+		return keyedStateStore.stateReads;
+	}
+
+	public long getStateAccesses() {
+		return getStateWrites() + getStateReads();
+	}
+
+	/**
+	 * Creates instance of {@link TestSharedBuffer}.
+	 *
+	 * @param typeSerializer serializer used to serialize incoming events
+	 * @param <T>            type of incoming events
+	 * @return TestSharedBuffer instance
+	 */
+	public static <T> TestSharedBuffer<T> createTestBuffer(TypeSerializer<T> typeSerializer) {
+		return new TestSharedBuffer<>(new MockKeyedStateStore(), typeSerializer);
+	}
+
+	private static class MockKeyedStateStore implements KeyedStateStore {
+
+		private long stateWrites = 0;
+		private long stateReads = 0;
+
+		@Override
+		public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+			return new ValueState<T>() {
+
+				private T value;
+
+				@Override
+				public T value() throws IOException {
+					stateReads++;
+					return value;
+				}
+
+				@Override
+				public void update(T value) throws IOException {
+					stateWrites++;
+					this.value = value;
+				}
+
+				@Override
+				public void clear() {
+					this.value = null;
+				}
+			};
+		}
+
+		@Override
+		public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+			throw new UnsupportedOperationException();
+		}
+
+		@Override
+		public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+			return new MapState<UK, UV>() {
+
+				private Map<UK, UV> values;
+
+				private Map<UK, UV> getOrSetMap() {
+					if (values == null) {
+						this.values = new HashMap<>();
+					}
+					return values;
+				}
+
+				@Override
+				public UV get(UK key) throws Exception {
+					stateReads++;
+					if (values == null) {
+						return null;
+					}
+
+					return values.get(key);
+				}
+
+				@Override
+				public void put(UK key, UV value) throws Exception {
+					stateWrites++;
+					getOrSetMap().put(key, value);
+				}
+
+				@Override
+				public void putAll(Map<UK, UV> map) throws Exception {
+					stateWrites++;
+					getOrSetMap().putAll(map);
+				}
+
+				@Override
+				public void remove(UK key) throws Exception {
+					if (values == null) {
+						return;
+					}
+
+					stateWrites++;
+					values.remove(key);
+				}
+
+				@Override
+				public boolean contains(UK key) throws Exception {
+					if (values == null) {
+						return false;
+					}
+
+					stateReads++;
+					return values.containsKey(key);
+				}
+
+				@Override
+				public Iterable<Map.Entry<UK, UV>> entries() throws Exception {
+					if (values == null) {
+						return Collections.emptyList();
+					}
+
+					return () -> new CountingIterator<>(values.entrySet().iterator());
+				}
+
+				@Override
+				public Iterable<UK> keys() throws Exception {
+					if (values == null) {
+						return Collections.emptyList();
+					}
+
+					return () -> new CountingIterator<>(values.keySet().iterator());
+				}
+
+				@Override
+				public Iterable<UV> values() throws Exception {
+					if (values == null) {
+						return Collections.emptyList();
+					}
+
+					return () -> new CountingIterator<>(values.values().iterator());
+				}
+
+				@Override
+				public Iterator<Map.Entry<UK, UV>> iterator() throws Exception {
+					if (values == null) {
+						return Iterators.emptyIterator();
+					}
+
+					return new CountingIterator<>(values.entrySet().iterator());
+				}
+
+				@Override
+				public void clear() {
+					stateWrites++;
+					this.values = null;
+				}
+			};
+		}
+
+		private class CountingIterator<T> implements Iterator<T> {
+
+			private final Iterator<T> iterator;
+
+			CountingIterator(Iterator<T> iterator) {
+				this.iterator = iterator;
+			}
+
+			@Override
+			public boolean hasNext() {
+				return iterator.hasNext();
+			}
+
+			@Override
+			public T next() {
+				stateReads++;
+				return iterator.next();
+			}
+		}
+	}
+
+}