You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/09 08:13:10 UTC

[flink] 02/05: [hotfix][cep] Introduced nfa test harness

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abe7ae23d5dc969b9034b206b8d492903c790b30
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 12 09:16:07 2018 +0100

    [hotfix][cep] Introduced nfa test harness
---
 .../apache/flink/cep/nfa/AfterMatchSkipITCase.java | 110 ++++++------
 .../org/apache/flink/cep/nfa/GreedyITCase.java     |   4 +-
 .../java/org/apache/flink/cep/nfa/GroupITCase.java |   8 +-
 .../flink/cep/nfa/IterativeConditionsITCase.java   |   4 +-
 .../java/org/apache/flink/cep/nfa/NFAITCase.java   |  91 ++++------
 .../apache/flink/cep/nfa/NFAStateAccessTest.java   |  41 ++---
 .../flink/cep/nfa/NFAStatusChangeITCase.java       |  20 ++-
 .../java/org/apache/flink/cep/nfa/NFATest.java     | 148 +++++++---------
 .../org/apache/flink/cep/nfa/NotPatternITCase.java |   4 +-
 .../apache/flink/cep/nfa/SameElementITCase.java    |  11 +-
 .../apache/flink/cep/nfa/TimesOrMoreITCase.java    |   4 +-
 .../org/apache/flink/cep/nfa/TimesRangeITCase.java |   4 +-
 .../apache/flink/cep/nfa/UntilConditionITCase.java |  36 ++--
 .../org/apache/flink/cep/utils/NFATestHarness.java | 191 +++++++++++++++++++++
 .../flink/cep/{nfa => utils}/NFATestUtilities.java |  60 +------
 15 files changed, 417 insertions(+), 319 deletions(-)

diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
index f6bf0a8..89f3bf8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
@@ -22,10 +22,10 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.aftermatch.SkipPastLastStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -40,9 +40,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
-import static org.apache.flink.cep.utils.NFAUtils.compile;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -77,9 +75,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}).times(3);
 
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3),
@@ -141,9 +139,11 @@ public class AfterMatchSkipITCase extends TestLogger{
 					}
 				});
 
-			NFA<Event> nfa = compile(pattern, false);
+			NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+				.withAfterMatchSkipStrategy(skipStrategy)
+				.build();
 
-			return feedNFA(streamEvents, nfa, skipStrategy);
+			return nfaTestHarness.feedRecords(streamEvents);
 		}
 	}
 
@@ -198,9 +198,11 @@ public class AfterMatchSkipITCase extends TestLogger{
 					}
 				}).oneOrMore();
 
-			NFA<Event> nfa = compile(pattern, false);
+			NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+				.withAfterMatchSkipStrategy(skipStrategy)
+				.build();
 
-			return feedNFA(streamEvents, nfa, skipStrategy);
+			return nfaTestHarness.feedRecords(streamEvents);
 		}
 	}
 
@@ -231,9 +233,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}).times(3);
 
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3),
@@ -275,9 +277,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}).times(2);
 
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(ab1, ab2, ab3, ab4),
@@ -318,9 +320,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("b");
 			}
 		}).times(2);
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(ab1, ab2, ab3, ab4),
@@ -376,9 +378,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("d");
 				}
 		});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Collections.singletonList(
 			Lists.newArrayList(a1, b1, c1, d1)
@@ -415,9 +417,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}
 		);
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(a2, b2)
@@ -459,9 +461,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("c");
 			}
 		});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(ab1, c1),
@@ -498,9 +500,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("c");
 			}
 		});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(ab1, c1),
@@ -543,9 +545,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("b");
 			}
 		}).oneOrMore().consecutive();
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, b1),
@@ -573,9 +575,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}
 		);
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		//skip to first element of a match should throw exception if they are enabled,
 		//this mode is used in MATCH RECOGNIZE which assumes that skipping to first element
@@ -645,9 +647,11 @@ public class AfterMatchSkipITCase extends TestLogger{
 						return value.getName().contains("c");
 					}
 				});
-			NFA<Event> nfa = compile(pattern, false);
+			NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+				.withAfterMatchSkipStrategy(skipStrategy)
+				.build();
 
-			return feedNFA(streamEvents, nfa, skipStrategy);
+			return nfaTestHarness.feedRecords(streamEvents);
 		}
 	}
 
@@ -686,9 +690,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("b");
 			}
 		}).oneOrMore().consecutive();
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, b1),
@@ -728,9 +732,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("b");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Collections.singletonList(
 			Lists.newArrayList(a1, a2, a3, b1)
@@ -768,9 +772,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("b");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3, b1),
@@ -809,9 +813,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("b");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3, b1),
@@ -851,9 +855,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("b");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3, b1),
@@ -913,9 +917,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("d");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a, b, c1, c2, c3, d),
@@ -962,9 +966,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					ctx.getEventsForPattern("a").iterator().next().getPrice() == value.getPrice();
 			}
 		});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, c1, b2),
@@ -991,22 +995,10 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}).times(2);
 
-		NFA<Event> nfa = compile(pattern, false);
-
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		NFAState nfaState = nfa.createInitialNFAState();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-				nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
-				nfa.process(
-					sharedBufferAccessor,
-					nfaState,
-					inputEvent.getValue(),
-					inputEvent.getTimestamp(),
-					matchSkipStrategy);
-			}
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+
+		nfaTestHarness.feedRecords(inputEvents);
 
 		assertThat(sharedBuffer.isEmpty(), Matchers.is(true));
 	}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
index 9e00130..e33e59a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
index 93116ff..9387966 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.pattern.GroupPattern;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -32,8 +33,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
@@ -1077,7 +1078,8 @@ public class GroupITCase extends TestLogger {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(c, a1, b1, d),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
index 9012379..ce28ddd 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
@@ -33,8 +33,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 2391d54..62d76db 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -21,11 +21,13 @@ package org.apache.flink.cep.nfa;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -48,8 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyLong;
@@ -414,7 +416,11 @@ public class NFAITCase extends TestLogger {
 			Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
 				nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp());
 			Collection<Map<String, List<Event>>> matchedPatterns =
-				nfa.process(sharedBufferAccessor, nfaState, event.getValue(), event.getTimestamp());
+				nfa.process(sharedBufferAccessor,
+					nfaState,
+					event.getValue(),
+					event.getTimestamp(),
+					AfterMatchSkipStrategy.noSkip());
 
 			resultingPatterns.addAll(matchedPatterns);
 			resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2342,11 +2348,13 @@ public class NFAITCase extends TestLogger {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 2);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 3);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent3, 4);
-		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+
+		nfaTestHarness.feedRecord(new StreamRecord<>(startEvent, 1));
+		nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent1, 2));
+		nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent2, 3));
+		nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent3, 4));
+		nfaTestHarness.feedRecord(new StreamRecord<>(end1, 6));
 
 		//pruning element
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2387,10 +2395,11 @@ public class NFAITCase extends TestLogger {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent, 5);
-		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+		nfaTestHarness.feedRecord(new StreamRecord<>(startEvent, 1));
+		nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent, 5));
+		nfaTestHarness.feedRecord(new StreamRecord<>(end1, 6));
 
 		//pruning element
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2432,11 +2441,12 @@ public class NFAITCase extends TestLogger {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
-		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+		nfaTestHarness.consumeRecord(new StreamRecord<>(startEvent, 1));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent1, 3));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent2, 4));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(end1, 6));
 
 		//pruning element
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2478,11 +2488,12 @@ public class NFAITCase extends TestLogger {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
-		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+		nfaTestHarness.consumeRecord(new StreamRecord<>(startEvent, 1));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent1, 3));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent2, 4));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(end1, 6));
 
 		//pruning element
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2734,25 +2745,12 @@ public class NFAITCase extends TestLogger {
 					}
 				}).times(3).consecutive();
 
-		NFA<Event> nfa = compile(pattern, false);
-
-		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
-
-		NFAState nfaState = nfa.createInitialNFAState();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, List<Event>>> patterns = nfa.process(
-				sharedBufferAccessor,
-				nfaState,
-				inputEvent.getValue(),
-				inputEvent.getTimestamp());
-
-			resultingPatterns.addAll(patterns);
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
+		Collection<Map<String, List<Event>>> resultingPatterns = nfaTestHarness.consumeRecords(inputEvents);
 
 		Assert.assertEquals(1L, resultingPatterns.size());
 
-		Map<String, List<Event>> match = resultingPatterns.get(0);
+		Map<String, List<Event>> match = resultingPatterns.iterator().next();
 		Assert.assertArrayEquals(
 				match.get("start").toArray(),
 				Lists.newArrayList(startEvent1, startEvent2, startEvent3, startEvent4).toArray());
@@ -2809,25 +2807,12 @@ public class NFAITCase extends TestLogger {
 				}
 			});
 
-		NFA<Event> nfa = compile(pattern, false);
-
-		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
-
-		NFAState nfaState = nfa.createInitialNFAState();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, List<Event>>> patterns = nfa.process(
-				sharedBufferAccessor,
-				nfaState,
-				inputEvent.getValue(),
-				inputEvent.getTimestamp());
-
-			resultingPatterns.addAll(patterns);
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
+		Collection<Map<String, List<Event>>> resultingPatterns = nfaTestHarness.consumeRecords(inputEvents);
 
 		Assert.assertEquals(1L, resultingPatterns.size());
 
-		Map<String, List<Event>> match = resultingPatterns.get(0);
+		Map<String, List<Event>> match = resultingPatterns.iterator().next();
 
 		List<String> expectedOrder = Lists.newArrayList("a", "b", "aa", "bb", "ab");
 		List<String> resultOrder = new ArrayList<>();
@@ -2847,8 +2832,8 @@ public class NFAITCase extends TestLogger {
 
 		try (SharedBufferAccessor<Event> accessor = Mockito.spy(sharedBuffer.getAccessor())) {
 			NFA<Event> nfa = compile(pattern, false);
-			nfa.process(accessor, nfa.createInitialNFAState(), a, 1);
-			nfa.process(accessor, nfa.createInitialNFAState(), b, 2);
+			nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip());
+			nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip());
 			Mockito.verify(accessor, Mockito.never()).advanceTime(anyLong());
 			nfa.advanceTime(accessor, nfa.createInitialNFAState(), 2);
 			Mockito.verify(accessor, Mockito.times(1)).advanceTime(2);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
index 227fe4e..1be5f4f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -32,7 +32,6 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -99,22 +98,13 @@ public class NFAStateAccessTest {
 			}
 		});
 
-		NFA<Event> nfa = compile(pattern, false);
-
 		TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
-					nfa.process(
-					accessor,
-					nfa.createInitialNFAState(),
-					inputEvent.getValue(),
-					inputEvent.getTimestamp());
-			}
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+		nfaTestHarness.consumeRecords(inputEvents);
 
-		assertEquals(2, sharedBuffer.getStateReads());
-		assertEquals(3, sharedBuffer.getStateWrites());
-		assertEquals(5, sharedBuffer.getStateAccesses());
+		assertEquals(58, sharedBuffer.getStateReads());
+		assertEquals(33, sharedBuffer.getStateWrites());
+		assertEquals(91, sharedBuffer.getStateAccesses());
 	}
 
 	@Test
@@ -182,21 +172,12 @@ public class NFAStateAccessTest {
 			}
 		});
 
-		NFA<Event> nfa = compile(pattern, false);
-
 		TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
-					nfa.process(
-					accessor,
-					nfa.createInitialNFAState(),
-					inputEvent.getValue(),
-					inputEvent.getTimestamp());
-			}
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+		nfaTestHarness.consumeRecords(inputEvents);
 
-		assertEquals(8, sharedBuffer.getStateReads());
-		assertEquals(12, sharedBuffer.getStateWrites());
-		assertEquals(20, sharedBuffer.getStateAccesses());
+		assertEquals(90, sharedBuffer.getStateReads());
+		assertEquals(31, sharedBuffer.getStateWrites());
+		assertEquals(121, sharedBuffer.getStateAccesses());
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index 8cbfba1..d0a7853 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
@@ -47,6 +48,7 @@ public class NFAStatusChangeITCase {
 
 	private SharedBuffer<Event> sharedBuffer;
 	private SharedBufferAccessor<Event> sharedBufferAccessor;
+	private AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
 
 	@Before
 	public void init() {
@@ -95,32 +97,32 @@ public class NFAStatusChangeITCase {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy);
 		assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy);
 		assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
 
 		// the status of the queue of ComputationStatus changed,
 		// more than one ComputationStatus is generated by the event from some ComputationStatus
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy);
 		assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy);
 		assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
@@ -164,10 +166,10 @@ public class NFAStatusChangeITCase {
 		NFAState nfaState = nfa.createInitialNFAState();
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy);
 		assertTrue(nfaState.isStateChanged());
 	}
 
@@ -193,7 +195,7 @@ public class NFAStatusChangeITCase {
 
 		nfaState.resetStateChanged();
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 6L);
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
 
 		nfaState.resetStateChanged();
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 17L);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index e5f4c9e..638b8b5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -19,13 +19,11 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
-import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -39,10 +37,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
@@ -91,9 +87,7 @@ public class NFATest extends TestLogger {
 		states.add(endState);
 		states.add(endingState);
 
-		NFA<Event> nfa = new NFA<>(states, 0, false);
-
-		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+		List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
 
 		Map<String, List<Event>> firstPattern = new HashMap<>();
 		firstPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
@@ -106,14 +100,16 @@ public class NFATest extends TestLogger {
 		expectedPatterns.add(firstPattern);
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+		NFA<Event> nfa = new NFA<>(states, 0, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+
+		Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
 
 	@Test
 	public void testTimeoutWindowPruning() throws Exception {
-		NFA<Event> nfa = createStartEndNFA();
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
@@ -121,7 +117,7 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L));
 
-		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+		List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
 
 		Map<String, List<Event>> secondPattern = new HashMap<>();
 		secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0)));
@@ -129,7 +125,10 @@ public class NFATest extends TestLogger {
 
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+		NFA<Event> nfa = createStartEndNFA();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+
+		Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -140,15 +139,17 @@ public class NFATest extends TestLogger {
 	 */
 	@Test
 	public void testWindowBorders() throws Exception {
-		NFA<Event> nfa = createStartEndNFA();
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
 		streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 3L));
 
-		Set<Map<String, List<Event>>> expectedPatterns = Collections.emptySet();
+		List<Map<String, List<Event>>> expectedPatterns = Collections.emptyList();
+
+		NFA<Event> nfa = createStartEndNFA();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -159,7 +160,6 @@ public class NFATest extends TestLogger {
 	 */
 	@Test
 	public void testTimeoutWindowPruningWindowBorders() throws Exception {
-		NFA<Event> nfa = createStartEndNFA();
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
@@ -167,7 +167,7 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 3.0), 3L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 3L));
 
-		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+		List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
 
 		Map<String, List<Event>> secondPattern = new HashMap<>();
 		secondPattern.put("start", Collections.singletonList(new Event(2, "start", 2.0)));
@@ -175,30 +175,12 @@ public class NFATest extends TestLogger {
 
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+		NFA<Event> nfa = createStartEndNFA();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
 
-		assertEquals(expectedPatterns, actualPatterns);
-	}
+		Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
 
-	public Collection<Map<String, List<Event>>> runNFA(
-		NFA<Event> nfa, NFAState nfaState, List<StreamRecord<Event>> inputs) throws Exception {
-		Set<Map<String, List<Event>>> actualPatterns = new HashSet<>();
-
-		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-			for (StreamRecord<Event> streamEvent : inputs) {
-				nfa.advanceTime(sharedBufferAccessor, nfaState, streamEvent.getTimestamp());
-				Collection<Map<String, List<Event>>> matchedPatterns = nfa.process(
-					sharedBufferAccessor,
-					nfaState,
-					streamEvent.getValue(),
-					streamEvent.getTimestamp());
-
-				actualPatterns.addAll(matchedPatterns);
-			}
-		}
-
-		return actualPatterns;
+		assertEquals(expectedPatterns, actualPatterns);
 	}
 
 	@Test
@@ -289,51 +271,49 @@ public class NFATest extends TestLogger {
 		patterns.add(pattern2);
 		patterns.add(pattern3);
 
-		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-
-			for (Pattern<Event, ?> p : patterns) {
-				NFA<Event> nfa = compile(p, false);
-
-				Event a = new Event(40, "a", 1.0);
-				Event b = new Event(41, "b", 2.0);
-				Event c = new Event(42, "c", 3.0);
-				Event b1 = new Event(41, "b", 3.0);
-				Event b2 = new Event(41, "b", 4.0);
-				Event b3 = new Event(41, "b", 5.0);
-				Event d = new Event(43, "d", 4.0);
-
-				NFAState nfaState = nfa.createInitialNFAState();
-
-				nfa.process(sharedBufferAccessor, nfaState, a, 1);
-				nfa.process(sharedBufferAccessor, nfaState, b, 2);
-				nfa.process(sharedBufferAccessor, nfaState, c, 3);
-				nfa.process(sharedBufferAccessor, nfaState, b1, 4);
-				nfa.process(sharedBufferAccessor, nfaState, b2, 5);
-				nfa.process(sharedBufferAccessor, nfaState, b3, 6);
-				nfa.process(sharedBufferAccessor, nfaState, d, 7);
-				nfa.process(sharedBufferAccessor, nfaState, a, 8);
-
-				NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
-
-				//serialize
-				ByteArrayOutputStream baos = new ByteArrayOutputStream();
-				serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
-				baos.close();
-
-				// copy
-				ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
-				ByteArrayOutputStream out = new ByteArrayOutputStream();
-				serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
-				in.close();
-				out.close();
-
-				// deserialize
-				ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
-				NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
-				bais.close();
-				assertEquals(nfaState, copy);
-			}
+		for (Pattern<Event, ?> p : patterns) {
+			NFA<Event> nfa = compile(p, false);
+
+			Event a = new Event(40, "a", 1.0);
+			Event b = new Event(41, "b", 2.0);
+			Event c = new Event(42, "c", 3.0);
+			Event b1 = new Event(41, "b", 3.0);
+			Event b2 = new Event(41, "b", 4.0);
+			Event b3 = new Event(41, "b", 5.0);
+			Event d = new Event(43, "d", 4.0);
+
+			NFAState nfaState = nfa.createInitialNFAState();
+
+			NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+
+			nfaTestHarness.consumeRecord(new StreamRecord<>(a, 1));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(b, 2));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(c, 3));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(b1, 4));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(b2, 5));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(b3, 6));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(d, 7));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(a, 8));
+
+			NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
+
+			//serialize
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
+			baos.close();
+
+			// copy
+			ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
+			ByteArrayOutputStream out = new ByteArrayOutputStream();
+			serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
+			in.close();
+			out.close();
+
+			// deserialize
+			ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
+			NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
+			bais.close();
+			assertEquals(nfaState, copy);
 		}
 	}
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
index 11a8484..6f33218 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index b7f1177..3d277af 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -33,8 +34,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
@@ -141,8 +142,9 @@ public void testClearingBuffer() throws Exception {
 	NFA<Event> nfa = compile(pattern, false);
 
 	NFAState nfaState = nfa.createInitialNFAState();
+	NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+	List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 		Lists.newArrayList(a1, b1, c1, d)
 	));
@@ -186,8 +188,9 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 	NFA<Event> nfa = compile(pattern, false);
 
 	NFAState nfaState = nfa.createInitialNFAState();
+	NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+	List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 		Lists.newArrayList(a1, d1, d2, d3),
 		Lists.newArrayList(a1, d1, d2),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
index 049c84b..93d8c56 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 203a1c2..dd85d87 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index be0f28b..39b23b4 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -31,8 +32,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
@@ -92,8 +93,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -142,8 +144,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -193,8 +196,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -244,8 +248,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
@@ -292,8 +297,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -342,8 +348,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -394,8 +401,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -525,8 +533,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -579,8 +588,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -631,10 +641,10 @@ public class UntilConditionITCase {
 		});
 
 		NFA<Event> nfa = compile(pattern, false);
-
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
new file mode 100644
index 0000000..dd080fb
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.NFAState;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test harness for setting up {@link NFA}.
+ */
+public final class NFATestHarness {
+
+	private final SharedBuffer<Event> sharedBuffer;
+	private final NFA<Event> nfa;
+	private final NFAState nfaState;
+	private final AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+	private NFATestHarness(
+			SharedBuffer<Event> sharedBuffer,
+			NFA<Event> nfa,
+			NFAState nfaState,
+			AfterMatchSkipStrategy afterMatchSkipStrategy) {
+		this.sharedBuffer = sharedBuffer;
+		this.nfa = nfa;
+		this.nfaState = nfaState;
+		this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+	}
+
+	/**
+	 * Constructs a test harness starting from a given {@link Pattern}.
+	 */
+	public static NFATestHarnessBuilderPattern forPattern(Pattern<Event, ?> pattern) {
+		return new NFATestHarnessBuilderPattern(pattern);
+	}
+
+	/**
+	 * Constructs a test harness starting from a given {@link NFA}.
+	 */
+	public static NFATestHarnessBuilderNFA forNFA(NFA<Event> nfa) {
+		return new NFATestHarnessBuilderNFA(nfa);
+	}
+
+	public List<List<Event>> feedRecords(List<StreamRecord<Event>> inputEvents) throws Exception {
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			resultingPatterns.addAll(feedRecord(inputEvent));
+		}
+		return resultingPatterns;
+	}
+
+	public List<List<Event>> feedRecord(StreamRecord<Event> inputEvent) throws Exception {
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+		Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
+		for (Map<String, List<Event>> p : matches) {
+			List<Event> res = new ArrayList<>();
+			for (List<Event> le : p.values()) {
+				res.addAll(le);
+			}
+			resultingPatterns.add(res);
+		}
+		return resultingPatterns;
+	}
+
+	public Collection<Map<String, List<Event>>> consumeRecords(Collection<StreamRecord<Event>> inputEvents) throws Exception {
+		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			resultingPatterns.addAll(consumeRecord(inputEvent));
+		}
+
+		return resultingPatterns;
+	}
+
+	public Collection<Map<String, List<Event>>> consumeRecord(StreamRecord<Event> inputEvent) throws Exception {
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+			nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
+			return nfa.process(
+				sharedBufferAccessor,
+				nfaState,
+				inputEvent.getValue(),
+				inputEvent.getTimestamp(),
+				afterMatchSkipStrategy);
+		}
+	}
+
+	/**
+	 * Builder for {@link NFATestHarness} that encapsulates {@link Pattern}.
+	 */
+	public static class NFATestHarnessBuilderPattern extends NFATestHarnessBuilderBase {
+
+		private final Pattern<Event, ?> pattern;
+		private boolean timeoutHandling = false;
+
+		NFATestHarnessBuilderPattern(Pattern<Event, ?> pattern) {
+			super(pattern.getAfterMatchSkipStrategy());
+			this.pattern = pattern;
+		}
+
+		public NFATestHarnessBuilderBase withTimeoutHandling() {
+			this.timeoutHandling = true;
+			return this;
+		}
+
+		@Override
+		public NFATestHarness build() {
+			NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
+			return new NFATestHarness(
+				sharedBuffer,
+				nfa,
+				nfa.createInitialNFAState(),
+				afterMatchSkipStrategy);
+		}
+	}
+
+	/**
+	 * Builder for {@link NFATestHarness} that encapsulates {@link NFA}.
+	 */
+	public static class NFATestHarnessBuilderNFA extends NFATestHarnessBuilderBase {
+
+		private final NFA<Event> nfa;
+		private NFAState nfaState;
+
+		NFATestHarnessBuilderNFA(NFA<Event> nfa) {
+			super(AfterMatchSkipStrategy.noSkip());
+			this.nfa = nfa;
+			this.nfaState = nfa.createInitialNFAState();
+		}
+
+		public NFATestHarnessBuilderBase withNFAState(NFAState nfaState) {
+			this.nfaState = nfaState;
+			return this;
+		}
+
+		@Override
+		public NFATestHarness build() {
+			return new NFATestHarness(sharedBuffer, nfa, nfaState, afterMatchSkipStrategy);
+		}
+	}
+
+	/**
+	 * Common builder, which can be used independent if we start with {@link Pattern} or {@link NFA}.
+	 * Enables to provide custom services like {@link SharedBuffer} etc.
+	 */
+	public abstract static class NFATestHarnessBuilderBase {
+
+		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+		NFATestHarnessBuilderBase(AfterMatchSkipStrategy skipStrategy) {
+			this.afterMatchSkipStrategy = skipStrategy;
+		}
+
+		public NFATestHarnessBuilderBase withSharedBuffer(SharedBuffer<Event> sharedBuffer) {
+			this.sharedBuffer = sharedBuffer;
+			return this;
+		}
+
+		public NFATestHarnessBuilderBase withAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) {
+			this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+			return this;
+		}
+
+		public abstract NFATestHarness build();
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
similarity index 57%
rename from flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
index 91e490e..ac45798 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
@@ -16,77 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep.nfa;
+package org.apache.flink.cep.utils;
 
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
-import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.junit.Assert;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Base method for IT tests of {@link NFA}. It provides utility methods.
  */
 public class NFATestUtilities {
 
-	public static List<List<Event>> feedNFA(
-		List<StreamRecord<Event>> inputEvents,
-		NFA<Event> nfa) throws Exception {
-		return feedNFA(inputEvents, nfa, nfa.createInitialNFAState(), AfterMatchSkipStrategy.noSkip());
-	}
-
-	public static List<List<Event>> feedNFA(
-			List<StreamRecord<Event>> inputEvents,
-			NFA<Event> nfa,
-			NFAState nfaState) throws Exception {
-		return feedNFA(inputEvents, nfa, nfaState, AfterMatchSkipStrategy.noSkip());
-	}
-
-	public static List<List<Event>> feedNFA(
-		List<StreamRecord<Event>> inputEvents,
-		NFA<Event> nfa,
-		AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
-		return feedNFA(inputEvents, nfa, nfa.createInitialNFAState(), afterMatchSkipStrategy);
-	}
-
+	@Deprecated
 	public static List<List<Event>> feedNFA(
 			List<StreamRecord<Event>> inputEvents,
-			NFA<Event> nfa,
-			NFAState nfaState,
-			AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-
-		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-				nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
-				Collection<Map<String, List<Event>>> patterns = nfa.process(
-					sharedBufferAccessor,
-					nfaState,
-					inputEvent.getValue(),
-					inputEvent.getTimestamp(),
-					afterMatchSkipStrategy);
-				for (Map<String, List<Event>> p: patterns) {
-					List<Event> res = new ArrayList<>();
-					for (List<Event> le: p.values()) {
-						res.addAll(le);
-					}
-					resultingPatterns.add(res);
-				}
-			}
-		}
-		return resultingPatterns;
+			NFA<Event> nfa) throws Exception {
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+		return nfaTestHarness.feedRecords(inputEvents);
 	}
 
 	public static void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {