You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/09 08:13:08 UTC

[flink] branch master updated (9742ef7 -> 47e9ccf)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 9742ef7  [FLINK-11251][metrics] Exclude variable values from logical scope of generic groups
     new 5e46487  [hotfix][cep] Change contract of cep TimerContext#timestamp to never return null
     new abe7ae2  [hotfix][cep] Introduced nfa test harness
     new 60f8826  [FLINK-10596][cep] Added access to timer service in IterativeCondition
     new 0a46510  [hotfix][cep] Renamed TimerContext to TimeContext
     new 47e9ccf  [FLINK-10596][cep, docs] Added description of TimeContext to docs

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/libs/cep.md                               |  41 +++-
 ...ernStreamScalaJavaAPIInteroperabilityTest.scala |   2 +-
 .../cep/functions/PatternProcessFunction.java      |   4 +-
 .../adaptors/PatternTimeoutFlatSelectAdapter.java  |   3 +-
 .../adaptors/PatternTimeoutSelectAdapter.java      |   3 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java    |  84 ++++-----
 .../org/apache/flink/cep/operator/CepOperator.java |  31 ++-
 .../cep/pattern/conditions/IterativeCondition.java |   5 +-
 .../TimerContext.java => time/TimeContext.java}    |  10 +-
 .../org/apache/flink/cep/time/TimerService.java    |  15 +-
 .../apache/flink/cep/nfa/AfterMatchSkipITCase.java | 110 +++++------
 .../org/apache/flink/cep/nfa/GreedyITCase.java     |   4 +-
 .../java/org/apache/flink/cep/nfa/GroupITCase.java |   8 +-
 .../flink/cep/nfa/IterativeConditionsITCase.java   |   4 +-
 .../java/org/apache/flink/cep/nfa/NFAITCase.java   |  98 +++++-----
 .../nfa/NFAIterativeConditionTimeContextTest.java  |  91 +++++++++
 .../apache/flink/cep/nfa/NFAStateAccessTest.java   |  41 ++--
 .../flink/cep/nfa/NFAStatusChangeITCase.java       |  23 ++-
 .../java/org/apache/flink/cep/nfa/NFATest.java     | 148 +++++++--------
 .../org/apache/flink/cep/nfa/NotPatternITCase.java |   4 +-
 .../apache/flink/cep/nfa/SameElementITCase.java    |  11 +-
 .../apache/flink/cep/nfa/TimesOrMoreITCase.java    |   4 +-
 .../org/apache/flink/cep/nfa/TimesRangeITCase.java |   4 +-
 .../apache/flink/cep/nfa/UntilConditionITCase.java |  36 ++--
 .../apache/flink/cep/operator/CEPOperatorTest.java |  35 ++++
 .../operator/CepProcessFunctionContextTest.java    |  35 ++--
 .../org/apache/flink/cep/utils/NFATestHarness.java | 208 +++++++++++++++++++++
 .../flink/cep/{nfa => utils}/NFATestUtilities.java |  60 +-----
 .../apache/flink/cep/utils/TestTimerService.java   |  26 ++-
 29 files changed, 722 insertions(+), 426 deletions(-)
 rename flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/{context/TimerContext.java => time/TimeContext.java} (83%)
 copy flink-core/src/main/java/org/apache/flink/core/memory/MemoryType.java => flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java (71%)
 create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimeContextTest.java
 create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
 rename flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/{nfa => utils}/NFATestUtilities.java (57%)
 copy flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/stats/DisabledKvStateRequestStats.java => flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java (62%)


[flink] 03/05: [FLINK-10596][cep] Added access to timer service in IterativeCondition

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60f8826de75bdb1294ca4cd26b2af155de62c0df
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 12 09:17:40 2018 +0100

    [FLINK-10596][cep] Added access to timer service in IterativeCondition
---
 ...ernStreamScalaJavaAPIInteroperabilityTest.scala |  2 +-
 .../cep/functions/PatternProcessFunction.java      |  2 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java    | 84 ++++++++++----------
 .../org/apache/flink/cep/operator/CepOperator.java | 20 ++++-
 .../cep/pattern/conditions/IterativeCondition.java |  5 +-
 .../flink/cep/{context => time}/TimerContext.java  |  6 +-
 .../TimerContext.java => time/TimerService.java}   | 22 ++----
 .../java/org/apache/flink/cep/nfa/NFAITCase.java   | 13 +++-
 .../nfa/NFAIterativeConditionTimerContextTest.java | 91 ++++++++++++++++++++++
 .../flink/cep/nfa/NFAStatusChangeITCase.java       | 21 ++---
 .../apache/flink/cep/operator/CEPOperatorTest.java | 35 +++++++++
 .../org/apache/flink/cep/utils/NFATestHarness.java | 35 ++++++---
 .../apache/flink/cep/utils/TestTimerService.java}  | 36 ++++-----
 13 files changed, 266 insertions(+), 106 deletions(-)

diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
index 5f6055d..80d96ac 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -150,7 +150,7 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
       outputs.getOrElseUpdate(outputTag, ListBuffer.empty).append(value)
     }
 
-    override def timestamp(): lang.Long = null
+    override def timestamp(): Long = 0
 
     override def currentProcessingTime(): Long = System.currentTimeMillis()
   }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
index b392501..b8e6821 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
@@ -20,7 +20,7 @@ package org.apache.flink.cep.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.cep.context.TimerContext;
+import org.apache.flink.cep.time.TimerContext;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 48bb587..869a641 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -38,6 +38,7 @@ import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -211,32 +212,8 @@ public class NFA<T> {
 	 * @param nfaState The NFAState object that we need to affect while processing
 	 * @param event The current event to be processed or null if only pruning shall be done
 	 * @param timestamp The timestamp of the current event
-	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
-	 * reached a final state) and the collection of timed out patterns (if timeout handling is
-	 * activated)
-	 * @throws Exception Thrown if the system cannot access the state.
-	 */
-	public Collection<Map<String, List<T>>> process(
-			final SharedBufferAccessor<T> sharedBufferAccessor,
-			final NFAState nfaState,
-			final T event,
-			final long timestamp) throws Exception {
-		return process(sharedBufferAccessor, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip());
-	}
-
-	/**
-	 * Processes the next input event. If some of the computations reach a final state then the
-	 * resulting event sequences are returned. If computations time out and timeout handling is
-	 * activated, then the timed out event patterns are returned.
-	 *
-	 * <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
-	 * with the element that resulted in the stop state.
-	 *
-	 * @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
-	 * @param nfaState The NFAState object that we need to affect while processing
-	 * @param event The current event to be processed or null if only pruning shall be done
-	 * @param timestamp The timestamp of the current event
 	 * @param afterMatchSkipStrategy The skip strategy to use after per match
+	 * @param timerService gives access to processing time and time characteristic, needed for condition evaluation
 	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
 	 * reached a final state) and the collection of timed out patterns (if timeout handling is
 	 * activated)
@@ -247,9 +224,10 @@ public class NFA<T> {
 			final NFAState nfaState,
 			final T event,
 			final long timestamp,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+			final AfterMatchSkipStrategy afterMatchSkipStrategy,
+			final TimerService timerService) throws Exception {
 		try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) {
-			return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy);
+			return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy, timerService);
 		}
 	}
 
@@ -305,7 +283,8 @@ public class NFA<T> {
 			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final NFAState nfaState,
 			final EventWrapper event,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+			final AfterMatchSkipStrategy afterMatchSkipStrategy,
+			final TimerService timerService) throws Exception {
 
 		final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 		final PriorityQueue<ComputationState> potentialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
@@ -316,7 +295,7 @@ public class NFA<T> {
 				sharedBufferAccessor,
 				computationState,
 				event,
-				event.getTimestamp());
+				timerService);
 
 			if (newComputationStates.size() != 1) {
 				nfaState.setStateChanged();
@@ -555,7 +534,7 @@ public class NFA<T> {
 	 * @param sharedBufferAccessor The accessor to shared buffer that we need to change
 	 * @param computationState Current computation state
 	 * @param event Current event which is processed
-	 * @param timestamp Timestamp of the current event
+	 * @param timerService timer service which provides access to time related features
 	 * @return Collection of computation states which result from the current one
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
@@ -563,9 +542,13 @@ public class NFA<T> {
 			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final ComputationState computationState,
 			final EventWrapper event,
-			final long timestamp) throws Exception {
+			final TimerService timerService) throws Exception {
 
-		final ConditionContext<T> context = new ConditionContext<>(this, sharedBufferAccessor, computationState);
+		final ConditionContext context = new ConditionContext(
+			sharedBufferAccessor,
+			computationState,
+			timerService,
+			event.getTimestamp());
 
 		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(context, computationState, event.getEvent());
 
@@ -628,7 +611,7 @@ public class NFA<T> {
 					final long startTimestamp;
 					final EventId startEventId;
 					if (isStartState(computationState)) {
-						startTimestamp = timestamp;
+						startTimestamp = event.getTimestamp();
 						startEventId = event.getEventId();
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
@@ -694,7 +677,7 @@ public class NFA<T> {
 	}
 
 	private State<T> findFinalStateAfterProceed(
-			ConditionContext<T> context,
+			ConditionContext context,
 			State<T> state,
 			T event) {
 		final Stack<State<T>> statesToCheck = new Stack<>();
@@ -725,7 +708,7 @@ public class NFA<T> {
 	}
 
 	private OutgoingEdges<T> createDecisionGraph(
-			ConditionContext<T> context,
+			ConditionContext context,
 			ComputationState computationState,
 			T event) {
 		State<T> state = getState(computationState);
@@ -765,7 +748,7 @@ public class NFA<T> {
 	}
 
 	private boolean checkFilterCondition(
-			ConditionContext<T> context,
+			ConditionContext context,
 			IterativeCondition<T> condition,
 			T event) throws Exception {
 		return condition == null || condition.filter(event, context);
@@ -804,7 +787,11 @@ public class NFA<T> {
 	/**
 	 * The context used when evaluating this computation state.
 	 */
-	private static class ConditionContext<T> implements IterativeCondition.Context<T> {
+	private class ConditionContext implements IterativeCondition.Context<T> {
+
+		private final TimerService timerService;
+
+		private final long eventTimestamp;
 
 		/** The current computation state. */
 		private ComputationState computationState;
@@ -816,17 +803,17 @@ public class NFA<T> {
 		 */
 		private Map<String, List<T>> matchedEvents;
 
-		private NFA<T> nfa;
-
 		private SharedBufferAccessor<T> sharedBufferAccessor;
 
 		ConditionContext(
-				final NFA<T> nfa,
 				final SharedBufferAccessor<T> sharedBufferAccessor,
-				final ComputationState computationState) {
+				final ComputationState computationState,
+				final TimerService timerService,
+				final long eventTimestamp) {
 			this.computationState = computationState;
-			this.nfa = nfa;
 			this.sharedBufferAccessor = sharedBufferAccessor;
+			this.timerService = timerService;
+			this.eventTimestamp = eventTimestamp;
 		}
 
 		@Override
@@ -837,7 +824,8 @@ public class NFA<T> {
 			// this is to avoid any overheads when using a simple, non-iterative condition.
 
 			if (matchedEvents == null) {
-				this.matchedEvents = sharedBufferAccessor.materializeMatch(nfa.extractCurrentMatches(sharedBufferAccessor,
+				this.matchedEvents = sharedBufferAccessor.materializeMatch(extractCurrentMatches(
+					sharedBufferAccessor,
 					computationState));
 			}
 
@@ -851,6 +839,16 @@ public class NFA<T> {
 				}
 			};
 		}
+
+		@Override
+		public long timestamp() {
+			return eventTimestamp;
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return timerService.currentProcessingTime();
+		}
 	}
 
 	////////////////////				DEPRECATED/MIGRATION UTILS
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 262d89d..357fbc7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -40,6 +40,7 @@ import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -130,6 +131,9 @@ public class CepOperator<IN, KEY, OUT>
 	/** Wrapped RuntimeContext that limits the underlying context features. */
 	private transient CepRuntimeContext cepRuntimeContext;
 
+	/** Thin context passed to NFA that gives access to time related characteristics. */
+	private transient TimerService cepTimerService;
+
 	public CepOperator(
 			final TypeSerializer<IN> inputSerializer,
 			final boolean isProcessingTime,
@@ -216,6 +220,7 @@ public class CepOperator<IN, KEY, OUT>
 
 		context = new ContextFunctionImpl();
 		collector = new TimestampedCollector<>(output);
+		cepTimerService = new TimerServiceImpl();
 	}
 
 	@Override
@@ -415,7 +420,7 @@ public class CepOperator<IN, KEY, OUT>
 	private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
 		try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
 			Collection<Map<String, List<IN>>> patterns =
-				nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy);
+				nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy, cepTimerService);
 			processMatchedSequences(patterns, timestamp);
 		}
 	}
@@ -465,6 +470,19 @@ public class CepOperator<IN, KEY, OUT>
 	}
 
 	/**
+	 * Gives {@link NFA} access to {@link InternalTimerService} and tells if {@link CepOperator} works in
+	 * processing time. Should be instantiated once per operator.
+	 */
+	private class TimerServiceImpl implements TimerService {
+
+		@Override
+		public long currentProcessingTime() {
+			return timerService.currentProcessingTime();
+		}
+
+	}
+
+	/**
 	 * Implementation of {@link PatternProcessFunction.Context}. Design to be instantiated once per operator.
 	 * It serves three methods:
 	 *  <ul>
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
index f716ec4..786cb08 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.cep.time.TimerContext;
 
 import java.io.Serializable;
 
@@ -59,6 +61,7 @@ import java.io.Serializable;
  * the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary,
  * so when implementing your condition, try to minimize the times the method is called.
  */
+@PublicEvolving
 public abstract class IterativeCondition<T> implements Function, Serializable {
 
 	private static final long serialVersionUID = 7067817235759351255L;
@@ -84,7 +87,7 @@ public abstract class IterativeCondition<T> implements Function, Serializable {
 	/**
 	 * The context used when evaluating the {@link IterativeCondition condition}.
 	 */
-	public interface Context<T> {
+	public interface Context<T> extends TimerContext {
 
 		/**
 		 * @return An {@link Iterable} over the already accepted elements
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
similarity index 91%
copy from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
copy to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
index 4f0eab2..9b5db52 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.time;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.cep.functions.PatternProcessFunction;
@@ -32,8 +32,8 @@ public interface TimerContext {
 	/**
 	 * Timestamp of the element currently being processed.
 	 *
-	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
-	 * time when event entered the cep operator.
+	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this means the
+	 * time when the event entered the cep operator.
 	 */
 	long timestamp();
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
similarity index 58%
copy from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
copy to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
index 4f0eab2..464e267 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
@@ -16,28 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.time;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.annotation.Internal;
 
 /**
- * Enables access to time related characteristics such as current processing time or timestamp of currently processed
- * element. Used in {@link PatternProcessFunction} and
- * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ * Enables to provide time characteristic to {@link org.apache.flink.cep.nfa.NFA} for use in
+ * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}.
  */
-@PublicEvolving
-public interface TimerContext {
+@Internal
+public interface TimerService {
 
 	/**
-	 * Timestamp of the element currently being processed.
-	 *
-	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
-	 * time when event entered the cep operator.
+	 * Current processing time as returned from {@link org.apache.flink.streaming.api.TimerService}.
 	 */
-	long timestamp();
-
-	/** Returns the current processing time. */
 	long currentProcessingTime();
 
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 62d76db..4b1f627 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.TestTimerService;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
@@ -420,7 +421,8 @@ public class NFAITCase extends TestLogger {
 					nfaState,
 					event.getValue(),
 					event.getTimestamp(),
-					AfterMatchSkipStrategy.noSkip());
+					AfterMatchSkipStrategy.noSkip(),
+					new TestTimerService());
 
 			resultingPatterns.addAll(matchedPatterns);
 			resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2830,10 +2832,13 @@ public class NFAITCase extends TestLogger {
 		Event a = new Event(40, "a", 1.0);
 		Event b = new Event(41, "b", 2.0);
 
+		NFA<Event> nfa = compile(pattern, false);
+		TestTimerService timerService = new TestTimerService();
 		try (SharedBufferAccessor<Event> accessor = Mockito.spy(sharedBuffer.getAccessor())) {
-			NFA<Event> nfa = compile(pattern, false);
-			nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip());
-			nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip());
+			nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip(),
+				timerService);
+			nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip(),
+				timerService);
 			Mockito.verify(accessor, Mockito.never()).advanceTime(anyLong());
 			nfa.advanceTime(accessor, nfa.createInitialNFAState(), 2);
 			Mockito.verify(accessor, Mockito.times(1)).advanceTime(2);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java
new file mode 100644
index 0000000..3f9b26a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
+import org.apache.flink.cep.utils.TestTimerService;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cep.utils.EventBuilder.event;
+import static org.apache.flink.cep.utils.NFATestHarness.forPattern;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+
+/**
+ * Tests for accesing time properties from {@link IterativeCondition}.
+ */
+public class NFAIterativeConditionTimerContextTest extends TestLogger {
+
+	@Test
+	public void testEventTimestamp() throws Exception {
+		final Event event = event().withId(1).build();
+		final long timestamp = 3;
+
+		final Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				return ctx.timestamp() == timestamp;
+			}
+		});
+
+		final NFATestHarness testHarness = forPattern(pattern).build();
+
+		final List<List<Event>> resultingPattern = testHarness.feedRecord(new StreamRecord<>(event, timestamp));
+
+		compareMaps(resultingPattern, Collections.singletonList(
+			Collections.singletonList(event)
+		));
+	}
+
+	@Test
+	public void testCurrentProcessingTime() throws Exception {
+		final Event event1 = event().withId(1).build();
+		final Event event2 = event().withId(2).build();
+
+		final Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				return ctx.currentProcessingTime() == 3;
+			}
+		});
+
+		final TestTimerService cepTimerService = new TestTimerService();
+		final NFATestHarness testHarness = forPattern(pattern)
+			.withTimerService(cepTimerService)
+			.build();
+
+		cepTimerService.setCurrentProcessingTime(1);
+		final List<List<Event>> resultingPatterns1 = testHarness.feedRecord(new StreamRecord<>(event1, 7));
+		cepTimerService.setCurrentProcessingTime(3);
+		final List<List<Event>> resultingPatterns2 = testHarness.feedRecord(new StreamRecord<>(event2, 8));
+
+		compareMaps(resultingPatterns1, Collections.emptyList());
+		compareMaps(resultingPatterns2, Collections.singletonList(
+			Collections.singletonList(event2)
+		));
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index d0a7853..c5e6682 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -26,7 +26,9 @@ import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.TestTimerService;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import org.junit.After;
@@ -49,6 +51,7 @@ public class NFAStatusChangeITCase {
 	private SharedBuffer<Event> sharedBuffer;
 	private SharedBufferAccessor<Event> sharedBufferAccessor;
 	private AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
+	private TimerService timerService = new TestTimerService();
 
 	@Before
 	public void init() {
@@ -97,32 +100,32 @@ public class NFAStatusChangeITCase {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy, timerService);
 		assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy, timerService);
 		assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
 
 		// the status of the queue of ComputationStatus changed,
 		// more than one ComputationStatus is generated by the event from some ComputationStatus
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy, timerService);
 		assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy, timerService);
 		assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy, timerService);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy, timerService);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
@@ -166,10 +169,10 @@ public class NFAStatusChangeITCase {
 		NFAState nfaState = nfa.createInitialNFAState();
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy, timerService);
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy, timerService);
 		assertTrue(nfaState.isStateChanged());
 	}
 
@@ -195,7 +198,7 @@ public class NFAStatusChangeITCase {
 
 		nfaState.resetStateChanged();
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 6L);
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy, timerService);
 
 		nfaState.resetStateChanged();
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 17L);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 2e4b41d..f496dc8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -30,10 +30,14 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.functions.PatternProcessFunction;
 import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.NFAState;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.mock.Whitebox;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -64,10 +68,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
+import static org.apache.flink.cep.utils.CepOperatorBuilder.createOperatorForNFA;
+import static org.apache.flink.cep.utils.EventBuilder.event;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.validateMockitoUsage;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for {@link CepOperator}.
@@ -101,6 +111,31 @@ public class CEPOperatorTest extends TestLogger {
 	}
 
 	@Test
+	public void testProcessingTimestampisPassedToNFA() throws Exception {
+
+		final NFA<Event> nfa = NFACompiler.compileFactory(Pattern.<Event>begin("begin"), true).createNFA();
+		final NFA<Event> spyNFA = spy(nfa);
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
+				CepOperatorTestUtilities.getCepTestHarness(createOperatorForNFA(spyNFA).build())) {
+
+			long timestamp = 5;
+			harness.open();
+			harness.setProcessingTime(timestamp);
+			StreamRecord<Event> event = event().withTimestamp(3).asStreamRecord();
+			harness.processElement(event);
+			verify(spyNFA).process(
+				any(SharedBufferAccessor.class),
+				any(NFAState.class),
+				eq(event.getValue()),
+				eq(timestamp),
+				any(AfterMatchSkipStrategy.class),
+				any(TimerService.class));
+		}
+	}
+
+	@Test
 	public void testKeyedCEPOperatorCheckpointing() throws Exception {
 
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
index dd080fb..f8e0f57 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.ArrayList;
@@ -41,16 +42,19 @@ public final class NFATestHarness {
 	private final NFA<Event> nfa;
 	private final NFAState nfaState;
 	private final AfterMatchSkipStrategy afterMatchSkipStrategy;
+	private final TimerService timerService;
 
 	private NFATestHarness(
 			SharedBuffer<Event> sharedBuffer,
 			NFA<Event> nfa,
 			NFAState nfaState,
-			AfterMatchSkipStrategy afterMatchSkipStrategy) {
+			AfterMatchSkipStrategy afterMatchSkipStrategy,
+			TimerService timerService) {
 		this.sharedBuffer = sharedBuffer;
 		this.nfa = nfa;
 		this.nfaState = nfaState;
 		this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+		this.timerService = timerService;
 	}
 
 	/**
@@ -68,7 +72,7 @@ public final class NFATestHarness {
 	}
 
 	public List<List<Event>> feedRecords(List<StreamRecord<Event>> inputEvents) throws Exception {
-		List<List<Event>> resultingPatterns = new ArrayList<>();
+		final List<List<Event>> resultingPatterns = new ArrayList<>();
 		for (StreamRecord<Event> inputEvent : inputEvents) {
 			resultingPatterns.addAll(feedRecord(inputEvent));
 		}
@@ -76,8 +80,8 @@ public final class NFATestHarness {
 	}
 
 	public List<List<Event>> feedRecord(StreamRecord<Event> inputEvent) throws Exception {
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-		Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
+		final List<List<Event>> resultingPatterns = new ArrayList<>();
+		final Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
 		for (Map<String, List<Event>> p : matches) {
 			List<Event> res = new ArrayList<>();
 			for (List<Event> le : p.values()) {
@@ -89,7 +93,7 @@ public final class NFATestHarness {
 	}
 
 	public Collection<Map<String, List<Event>>> consumeRecords(Collection<StreamRecord<Event>> inputEvents) throws Exception {
-		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+		final List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
 		for (StreamRecord<Event> inputEvent : inputEvents) {
 			resultingPatterns.addAll(consumeRecord(inputEvent));
 		}
@@ -105,7 +109,8 @@ public final class NFATestHarness {
 				nfaState,
 				inputEvent.getValue(),
 				inputEvent.getTimestamp(),
-				afterMatchSkipStrategy);
+				afterMatchSkipStrategy,
+				timerService);
 		}
 	}
 
@@ -129,12 +134,13 @@ public final class NFATestHarness {
 
 		@Override
 		public NFATestHarness build() {
-			NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
+			final NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
 			return new NFATestHarness(
 				sharedBuffer,
 				nfa,
 				nfa.createInitialNFAState(),
-				afterMatchSkipStrategy);
+				afterMatchSkipStrategy,
+				timerService);
 		}
 	}
 
@@ -159,7 +165,12 @@ public final class NFATestHarness {
 
 		@Override
 		public NFATestHarness build() {
-			return new NFATestHarness(sharedBuffer, nfa, nfaState, afterMatchSkipStrategy);
+			return new NFATestHarness(
+				sharedBuffer,
+				nfa,
+				nfaState,
+				afterMatchSkipStrategy,
+				timerService);
 		}
 	}
 
@@ -171,6 +182,7 @@ public final class NFATestHarness {
 
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
 		AfterMatchSkipStrategy afterMatchSkipStrategy;
+		TimerService timerService = new TestTimerService();
 
 		NFATestHarnessBuilderBase(AfterMatchSkipStrategy skipStrategy) {
 			this.afterMatchSkipStrategy = skipStrategy;
@@ -186,6 +198,11 @@ public final class NFATestHarness {
 			return this;
 		}
 
+		public NFATestHarnessBuilderBase withTimerService(TimerService timerService) {
+			this.timerService = timerService;
+			return this;
+		}
+
 		public abstract NFATestHarness build();
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
similarity index 51%
rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
index 4f0eab2..5fd9755 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
@@ -16,28 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.utils;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.time.TimerService;
 
 /**
- * Enables access to time related characteristics such as current processing time or timestamp of currently processed
- * element. Used in {@link PatternProcessFunction} and
- * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ * Test implementation of {@link TimerService}. Provides setters for encapsulated properties.
  */
-@PublicEvolving
-public interface TimerContext {
-
-	/**
-	 * Timestamp of the element currently being processed.
-	 *
-	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
-	 * time when event entered the cep operator.
-	 */
-	long timestamp();
-
-	/** Returns the current processing time. */
-	long currentProcessingTime();
+public class TestTimerService implements TimerService {
+
+	public static final long UNDEFINED_TIME = Long.MIN_VALUE;
+
+	private Long processingTime = UNDEFINED_TIME;
+
+	@Override
+	public long currentProcessingTime() {
+		return processingTime;
+	}
+
+	public void setCurrentProcessingTime(long processingTime) {
+		this.processingTime = processingTime;
+	}
 
 }


[flink] 04/05: [hotfix][cep] Renamed TimerContext to TimeContext

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0a46510a7e87f350492f4e4889c006f87ae7c665
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jan 8 12:07:48 2019 +0100

    [hotfix][cep] Renamed TimerContext to TimeContext
---
 .../java/org/apache/flink/cep/functions/PatternProcessFunction.java   | 4 ++--
 .../org/apache/flink/cep/pattern/conditions/IterativeCondition.java   | 4 ++--
 .../org/apache/flink/cep/time/{TimerContext.java => TimeContext.java} | 2 +-
 ...imerContextTest.java => NFAIterativeConditionTimeContextTest.java} | 2 +-
 4 files changed, 6 insertions(+), 6 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
index b8e6821..afc798f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
@@ -20,7 +20,7 @@ package org.apache.flink.cep.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.cep.time.TimerContext;
+import org.apache.flink.cep.time.TimeContext;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
@@ -64,7 +64,7 @@ public abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFuncti
 	/**
 	 * Gives access to time related characteristics as well as enables emitting elements to side outputs.
 	 */
-	public interface Context extends TimerContext {
+	public interface Context extends TimeContext {
 		/**
 		 * Emits a record to the side output identified by the {@link OutputTag}.
 		 *
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
index 786cb08..ad45dab 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -20,7 +20,7 @@ package org.apache.flink.cep.pattern.conditions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.cep.time.TimerContext;
+import org.apache.flink.cep.time.TimeContext;
 
 import java.io.Serializable;
 
@@ -87,7 +87,7 @@ public abstract class IterativeCondition<T> implements Function, Serializable {
 	/**
 	 * The context used when evaluating the {@link IterativeCondition condition}.
 	 */
-	public interface Context<T> extends TimerContext {
+	public interface Context<T> extends TimeContext {
 
 		/**
 		 * @return An {@link Iterable} over the already accepted elements
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimeContext.java
similarity index 97%
rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
rename to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimeContext.java
index 9b5db52..55e43b8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimeContext.java
@@ -27,7 +27,7 @@ import org.apache.flink.cep.functions.PatternProcessFunction;
  * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
  */
 @PublicEvolving
-public interface TimerContext {
+public interface TimeContext {
 
 	/**
 	 * Timestamp of the element currently being processed.
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimeContextTest.java
similarity index 97%
rename from flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimeContextTest.java
index 3f9b26a..3ebc65f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimeContextTest.java
@@ -38,7 +38,7 @@ import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
 /**
  * Tests for accesing time properties from {@link IterativeCondition}.
  */
-public class NFAIterativeConditionTimerContextTest extends TestLogger {
+public class NFAIterativeConditionTimeContextTest extends TestLogger {
 
 	@Test
 	public void testEventTimestamp() throws Exception {


[flink] 01/05: [hotfix][cep] Change contract of cep TimerContext#timestamp to never return null

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5e4648715f97338bb8ee0c8cc790bdbecc5b4422
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Mon Jan 7 15:23:40 2019 +0100

    [hotfix][cep] Change contract of cep TimerContext#timestamp to never
    return null
---
 .../org/apache/flink/cep/context/TimerContext.java |  6 ++--
 .../adaptors/PatternTimeoutFlatSelectAdapter.java  |  3 +-
 .../adaptors/PatternTimeoutSelectAdapter.java      |  3 +-
 .../org/apache/flink/cep/operator/CepOperator.java | 11 +++----
 .../operator/CepProcessFunctionContextTest.java    | 35 ++++++++++++----------
 5 files changed, 28 insertions(+), 30 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
index 23367cd..4f0eab2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
@@ -32,10 +32,10 @@ public interface TimerContext {
 	/**
 	 * Timestamp of the element currently being processed.
 	 *
-	 * <p>This might be {@code null}, for example if the time characteristic of your program
-	 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
+	 * time when event entered the cep operator.
 	 */
-	Long timestamp();
+	long timestamp();
 
 	/** Returns the current processing time. */
 	long currentProcessingTime();
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
index ab9c97d..13d04bd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
@@ -78,8 +78,7 @@ public class PatternTimeoutFlatSelectAdapter<IN, OUT, T>
 			Map<String, List<IN>> match,
 			Context ctx) throws Exception {
 		sideCollector.setCtx(ctx);
-		long timestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
-		flatTimeoutFunction.timeout(match, timestamp, sideCollector);
+		flatTimeoutFunction.timeout(match, ctx.timestamp(), sideCollector);
 	}
 
 	/**
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
index 29b0cf7..81999a8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
@@ -71,8 +71,7 @@ public class PatternTimeoutSelectAdapter<IN, OUT, T>
 			final Map<String, List<IN>> match,
 			final Context ctx) throws Exception {
 
-		final long resultTimestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
-		final T timedOutPatternResult = timeoutFunction.timeout(match, resultTimestamp);
+		final T timedOutPatternResult = timeoutFunction.timeout(match, ctx.timestamp());
 
 		ctx.output(timedOutPartialMatchesTag, timedOutPatternResult);
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 6df1fa0..262d89d 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -459,8 +459,9 @@ public class CepOperator<IN, KEY, OUT>
 	private void setTimestamp(long timestamp) {
 		if (!isProcessingTime) {
 			collector.setAbsoluteTimestamp(timestamp);
-			context.setTimestamp(timestamp);
 		}
+
+		context.setTimestamp(timestamp);
 	}
 
 	/**
@@ -493,12 +494,8 @@ public class CepOperator<IN, KEY, OUT>
 		}
 
 		@Override
-		public Long timestamp() {
-			if (isProcessingTime) {
-				return null;
-			} else {
-				return timestamp;
-			}
+		public long timestamp() {
+			return timestamp;
 		}
 
 		@Override
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
index 0947748..baefe84 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
@@ -45,8 +45,8 @@ import static org.apache.flink.cep.utils.OutputAsserter.assertOutput;
  */
 public class CepProcessFunctionContextTest extends TestLogger {
 
-	private static final boolean PROCESSING_TIME = false;
-	private static final boolean EVENT_TIME = true;
+	private static final boolean PROCESSING_TIME = true;
+	private static final boolean EVENT_TIME = false;
 
 	@Test
 	public void testTimestampPassingInEventTime() throws Exception {
@@ -56,7 +56,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(1),
 					new NFAForwardingFactory(),
-					PROCESSING_TIME))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -81,15 +81,18 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(1),
 					new NFAForwardingFactory(),
-					EVENT_TIME))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
+			harness.setProcessingTime(1);
 			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.setProcessingTime(2);
 			harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+			harness.setProcessingTime(3);
 
 			assertOutput(harness.getOutput())
-				.nextElementEquals("(NO_TIMESTAMP):A")
-				.nextElementEquals("(NO_TIMESTAMP):B")
+				.nextElementEquals("1:A")
+				.nextElementEquals("2:B")
 				.hasNoMoreElements();
 		}
 	}
@@ -102,7 +105,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(1),
 					new NFAForwardingFactory(),
-					EVENT_TIME))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(15);
@@ -125,7 +128,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(1),
 					new NFAForwardingFactory(),
-					PROCESSING_TIME))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(10);
@@ -150,7 +153,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(2, timedOut),
 					new NFATimingOutFactory(),
-					PROCESSING_TIME))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -181,7 +184,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(2, timedOut),
 					new NFATimingOutFactory(),
-					EVENT_TIME))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(3);
@@ -192,11 +195,11 @@ public class CepProcessFunctionContextTest extends TestLogger {
 			harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
 
 			assertOutput(harness.getOutput())
-				.nextElementEquals("(NO_TIMESTAMP):A:C")
+				.nextElementEquals("5:A:C")
 				.hasNoMoreElements();
 
 			assertOutput(harness.getSideOutput(timedOut))
-				.nextElementEquals("(NO_TIMESTAMP):C")
+				.nextElementEquals("15:C")
 				.hasNoMoreElements();
 		}
 	}
@@ -211,7 +214,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
 					new NFATimingOutFactory(),
-					PROCESSING_TIME))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -243,7 +246,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
 					new NFATimingOutFactory(),
-					EVENT_TIME))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(3);
@@ -290,7 +293,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 	 */
 	private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber) {
 		return new AccessContextWithNames(stateNumber,
-			context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+			context -> String.valueOf(context.timestamp()));
 	}
 
 	/**
@@ -310,7 +313,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 			OutputTag<String> timedOutTag) {
 		return new AccessContextWithNamesWithTimedOut(stateNumber,
 			timedOutTag,
-			context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+			context -> String.valueOf(context.timestamp()));
 	}
 
 	/**


[flink] 05/05: [FLINK-10596][cep, docs] Added description of TimeContext to docs

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 47e9ccf0d3686cfe3e3ffb8f38e606b0f3d0fe0e
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Jan 8 12:09:15 2019 +0100

    [FLINK-10596][cep, docs] Added description of TimeContext to docs
---
 docs/dev/libs/cep.md | 41 +++++++++++++++++++++++++++++++++++++++--
 1 file changed, 39 insertions(+), 2 deletions(-)

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ab44b5c..a358ba2 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -311,6 +311,8 @@ middle.oneOrMore()
 previously accepted events for a given potential match. The cost of this operation can vary, so when implementing
 your condition, try to minimize its use.
 
+Described context gives one access to event time characteristics as well. For more info see [Time context](#time-context).
+
 **Simple Conditions:** This type of condition extends the aforementioned `IterativeCondition` class and decides
 whether to accept an event or not, based *only* on properties of the event itself.
 
@@ -1508,6 +1510,7 @@ class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT>
 
 The `PatternProcessFunction` gives access to a `Context` object. Thanks to it, one can access time related
 characteristics such as `currentProcessingTime` or `timestamp` of current match (which is the timestamp of the last element assigned to the match).
+For more info see [Time context](#time-context).
 Through this context one can also emit results to a [side-output]({{ site.baseurl }}/dev/stream/side_output.html).
 
 
@@ -1594,7 +1597,9 @@ val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
 </div>
 </div>
 
-## Handling Lateness in Event Time
+## Time in CEP library
+
+### Handling Lateness in Event Time
 
 In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.
 
@@ -1620,7 +1625,6 @@ SingleOutputStreamOperator<ComplexEvent> result = patternStream
 
 DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);
 
-
 {% endhighlight %}
 
 </div>
@@ -1646,6 +1650,39 @@ val lateData: DataStream<String> = result.getSideOutput(lateDataOutputTag)
 </div>
 </div>
 
+### Time context
+
+In [PatternProcessFunction](#selecting-from-patterns) as well as in [IterativeCondition](#conditions) user has access to a context
+that implements `TimeContext` as follows:
+
+{% highlight java %}
+/**
+ * Enables access to time related characteristics such as current processing time or timestamp of
+ * currently processed element. Used in {@link PatternProcessFunction} and
+ * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ */
+@PublicEvolving
+public interface TimeContext {
+
+	/**
+	 * Timestamp of the element currently being processed.
+	 *
+	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this
+	 * will be set to the time when event entered the cep operator.
+	 */
+	long timestamp();
+
+	/** Returns the current processing time. */
+	long currentProcessingTime();
+}
+{% endhighlight %}
+
+This context gives user access to time characteristics of processed events (incoming records in case of `IterativeCondition` and matches in case of `PatternProcessFunction`).
+Call to `TimeContext#currentProcessingTime` always gives you the value of current processing time and this call should be preferred to e.g. calling `System.currentTimeMillis()`.
+
+In case of `TimeContext#timestamp()` the returned value is equal to assigned timestamp in case of `EventTime`. In `ProcessingTime` this will equal to the point of time when said event entered
+cep operator (or when the match was generated in case of `PatternProcessFunction`). This means that the value will be consistent across multiple calls to that method.
+
 ## Examples
 
 The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data


[flink] 02/05: [hotfix][cep] Introduced nfa test harness

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit abe7ae23d5dc969b9034b206b8d492903c790b30
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 12 09:16:07 2018 +0100

    [hotfix][cep] Introduced nfa test harness
---
 .../apache/flink/cep/nfa/AfterMatchSkipITCase.java | 110 ++++++------
 .../org/apache/flink/cep/nfa/GreedyITCase.java     |   4 +-
 .../java/org/apache/flink/cep/nfa/GroupITCase.java |   8 +-
 .../flink/cep/nfa/IterativeConditionsITCase.java   |   4 +-
 .../java/org/apache/flink/cep/nfa/NFAITCase.java   |  91 ++++------
 .../apache/flink/cep/nfa/NFAStateAccessTest.java   |  41 ++---
 .../flink/cep/nfa/NFAStatusChangeITCase.java       |  20 ++-
 .../java/org/apache/flink/cep/nfa/NFATest.java     | 148 +++++++---------
 .../org/apache/flink/cep/nfa/NotPatternITCase.java |   4 +-
 .../apache/flink/cep/nfa/SameElementITCase.java    |  11 +-
 .../apache/flink/cep/nfa/TimesOrMoreITCase.java    |   4 +-
 .../org/apache/flink/cep/nfa/TimesRangeITCase.java |   4 +-
 .../apache/flink/cep/nfa/UntilConditionITCase.java |  36 ++--
 .../org/apache/flink/cep/utils/NFATestHarness.java | 191 +++++++++++++++++++++
 .../flink/cep/{nfa => utils}/NFATestUtilities.java |  60 +------
 15 files changed, 417 insertions(+), 319 deletions(-)

diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
index f6bf0a8..89f3bf8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/AfterMatchSkipITCase.java
@@ -22,10 +22,10 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.aftermatch.SkipPastLastStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -40,9 +40,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
-import static org.apache.flink.cep.utils.NFAUtils.compile;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -77,9 +75,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}).times(3);
 
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3),
@@ -141,9 +139,11 @@ public class AfterMatchSkipITCase extends TestLogger{
 					}
 				});
 
-			NFA<Event> nfa = compile(pattern, false);
+			NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+				.withAfterMatchSkipStrategy(skipStrategy)
+				.build();
 
-			return feedNFA(streamEvents, nfa, skipStrategy);
+			return nfaTestHarness.feedRecords(streamEvents);
 		}
 	}
 
@@ -198,9 +198,11 @@ public class AfterMatchSkipITCase extends TestLogger{
 					}
 				}).oneOrMore();
 
-			NFA<Event> nfa = compile(pattern, false);
+			NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+				.withAfterMatchSkipStrategy(skipStrategy)
+				.build();
 
-			return feedNFA(streamEvents, nfa, skipStrategy);
+			return nfaTestHarness.feedRecords(streamEvents);
 		}
 	}
 
@@ -231,9 +233,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}).times(3);
 
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3),
@@ -275,9 +277,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}).times(2);
 
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(ab1, ab2, ab3, ab4),
@@ -318,9 +320,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("b");
 			}
 		}).times(2);
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(ab1, ab2, ab3, ab4),
@@ -376,9 +378,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("d");
 				}
 		});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Collections.singletonList(
 			Lists.newArrayList(a1, b1, c1, d1)
@@ -415,9 +417,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}
 		);
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(a2, b2)
@@ -459,9 +461,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("c");
 			}
 		});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(ab1, c1),
@@ -498,9 +500,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("c");
 			}
 		});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(ab1, c1),
@@ -543,9 +545,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("b");
 			}
 		}).oneOrMore().consecutive();
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, b1),
@@ -573,9 +575,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}
 		);
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		//skip to first element of a match should throw exception if they are enabled,
 		//this mode is used in MATCH RECOGNIZE which assumes that skipping to first element
@@ -645,9 +647,11 @@ public class AfterMatchSkipITCase extends TestLogger{
 						return value.getName().contains("c");
 					}
 				});
-			NFA<Event> nfa = compile(pattern, false);
+			NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern)
+				.withAfterMatchSkipStrategy(skipStrategy)
+				.build();
 
-			return feedNFA(streamEvents, nfa, skipStrategy);
+			return nfaTestHarness.feedRecords(streamEvents);
 		}
 	}
 
@@ -686,9 +690,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 				return value.getName().contains("b");
 			}
 		}).oneOrMore().consecutive();
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, b1),
@@ -728,9 +732,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("b");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Collections.singletonList(
 			Lists.newArrayList(a1, a2, a3, b1)
@@ -768,9 +772,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("b");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3, b1),
@@ -809,9 +813,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("b");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3, b1),
@@ -851,9 +855,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("b");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, a2, a3, b1),
@@ -913,9 +917,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					return value.getName().contains("d");
 				}
 			});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a, b, c1, c2, c3, d),
@@ -962,9 +966,9 @@ public class AfterMatchSkipITCase extends TestLogger{
 					ctx.getEventsForPattern("a").iterator().next().getPrice() == value.getPrice();
 			}
 		});
-		NFA<Event> nfa = compile(pattern, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
 
-		List<List<Event>> resultingPatterns = feedNFA(streamEvents, nfa, pattern.getAfterMatchSkipStrategy());
+		List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(streamEvents);
 
 		compareMaps(resultingPatterns, Lists.newArrayList(
 			Lists.newArrayList(a1, c1, b2),
@@ -991,22 +995,10 @@ public class AfterMatchSkipITCase extends TestLogger{
 				}
 			}).times(2);
 
-		NFA<Event> nfa = compile(pattern, false);
-
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		NFAState nfaState = nfa.createInitialNFAState();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-				nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
-				nfa.process(
-					sharedBufferAccessor,
-					nfaState,
-					inputEvent.getValue(),
-					inputEvent.getTimestamp(),
-					matchSkipStrategy);
-			}
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+
+		nfaTestHarness.feedRecords(inputEvents);
 
 		assertThat(sharedBuffer.isEmpty(), Matchers.is(true));
 	}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
index 9e00130..e33e59a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GreedyITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
index 93116ff..9387966 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/GroupITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.pattern.GroupPattern;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -32,8 +33,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
@@ -1077,7 +1078,8 @@ public class GroupITCase extends TestLogger {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(c, a1, b1, d),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
index 9012379..ce28ddd 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
@@ -33,8 +33,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 2391d54..62d76db 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -21,11 +21,13 @@ package org.apache.flink.cep.nfa;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -48,8 +50,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 import static org.mockito.Matchers.anyLong;
@@ -414,7 +416,11 @@ public class NFAITCase extends TestLogger {
 			Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
 				nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp());
 			Collection<Map<String, List<Event>>> matchedPatterns =
-				nfa.process(sharedBufferAccessor, nfaState, event.getValue(), event.getTimestamp());
+				nfa.process(sharedBufferAccessor,
+					nfaState,
+					event.getValue(),
+					event.getTimestamp(),
+					AfterMatchSkipStrategy.noSkip());
 
 			resultingPatterns.addAll(matchedPatterns);
 			resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2342,11 +2348,13 @@ public class NFAITCase extends TestLogger {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 2);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 3);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent3, 4);
-		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+
+		nfaTestHarness.feedRecord(new StreamRecord<>(startEvent, 1));
+		nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent1, 2));
+		nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent2, 3));
+		nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent3, 4));
+		nfaTestHarness.feedRecord(new StreamRecord<>(end1, 6));
 
 		//pruning element
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2387,10 +2395,11 @@ public class NFAITCase extends TestLogger {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent, 5);
-		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+		nfaTestHarness.feedRecord(new StreamRecord<>(startEvent, 1));
+		nfaTestHarness.feedRecord(new StreamRecord<>(middleEvent, 5));
+		nfaTestHarness.feedRecord(new StreamRecord<>(end1, 6));
 
 		//pruning element
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2432,11 +2441,12 @@ public class NFAITCase extends TestLogger {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
-		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+		nfaTestHarness.consumeRecord(new StreamRecord<>(startEvent, 1));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent1, 3));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent2, 4));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(end1, 6));
 
 		//pruning element
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2478,11 +2488,12 @@ public class NFAITCase extends TestLogger {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
-		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
-		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
+		nfaTestHarness.consumeRecord(new StreamRecord<>(startEvent, 1));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent1, 3));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(middleEvent2, 4));
+		nfaTestHarness.consumeRecord(new StreamRecord<>(end1, 6));
 
 		//pruning element
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
@@ -2734,25 +2745,12 @@ public class NFAITCase extends TestLogger {
 					}
 				}).times(3).consecutive();
 
-		NFA<Event> nfa = compile(pattern, false);
-
-		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
-
-		NFAState nfaState = nfa.createInitialNFAState();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, List<Event>>> patterns = nfa.process(
-				sharedBufferAccessor,
-				nfaState,
-				inputEvent.getValue(),
-				inputEvent.getTimestamp());
-
-			resultingPatterns.addAll(patterns);
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
+		Collection<Map<String, List<Event>>> resultingPatterns = nfaTestHarness.consumeRecords(inputEvents);
 
 		Assert.assertEquals(1L, resultingPatterns.size());
 
-		Map<String, List<Event>> match = resultingPatterns.get(0);
+		Map<String, List<Event>> match = resultingPatterns.iterator().next();
 		Assert.assertArrayEquals(
 				match.get("start").toArray(),
 				Lists.newArrayList(startEvent1, startEvent2, startEvent3, startEvent4).toArray());
@@ -2809,25 +2807,12 @@ public class NFAITCase extends TestLogger {
 				}
 			});
 
-		NFA<Event> nfa = compile(pattern, false);
-
-		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
-
-		NFAState nfaState = nfa.createInitialNFAState();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, List<Event>>> patterns = nfa.process(
-				sharedBufferAccessor,
-				nfaState,
-				inputEvent.getValue(),
-				inputEvent.getTimestamp());
-
-			resultingPatterns.addAll(patterns);
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).build();
+		Collection<Map<String, List<Event>>> resultingPatterns = nfaTestHarness.consumeRecords(inputEvents);
 
 		Assert.assertEquals(1L, resultingPatterns.size());
 
-		Map<String, List<Event>> match = resultingPatterns.get(0);
+		Map<String, List<Event>> match = resultingPatterns.iterator().next();
 
 		List<String> expectedOrder = Lists.newArrayList("a", "b", "aa", "bb", "ab");
 		List<String> resultOrder = new ArrayList<>();
@@ -2847,8 +2832,8 @@ public class NFAITCase extends TestLogger {
 
 		try (SharedBufferAccessor<Event> accessor = Mockito.spy(sharedBuffer.getAccessor())) {
 			NFA<Event> nfa = compile(pattern, false);
-			nfa.process(accessor, nfa.createInitialNFAState(), a, 1);
-			nfa.process(accessor, nfa.createInitialNFAState(), b, 2);
+			nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip());
+			nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip());
 			Mockito.verify(accessor, Mockito.never()).advanceTime(anyLong());
 			nfa.advanceTime(accessor, nfa.createInitialNFAState(), 2);
 			Mockito.verify(accessor, Mockito.times(1)).advanceTime(2);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
index 227fe4e..1be5f4f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
@@ -20,10 +20,10 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -32,7 +32,6 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -99,22 +98,13 @@ public class NFAStateAccessTest {
 			}
 		});
 
-		NFA<Event> nfa = compile(pattern, false);
-
 		TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
-					nfa.process(
-					accessor,
-					nfa.createInitialNFAState(),
-					inputEvent.getValue(),
-					inputEvent.getTimestamp());
-			}
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+		nfaTestHarness.consumeRecords(inputEvents);
 
-		assertEquals(2, sharedBuffer.getStateReads());
-		assertEquals(3, sharedBuffer.getStateWrites());
-		assertEquals(5, sharedBuffer.getStateAccesses());
+		assertEquals(58, sharedBuffer.getStateReads());
+		assertEquals(33, sharedBuffer.getStateWrites());
+		assertEquals(91, sharedBuffer.getStateAccesses());
 	}
 
 	@Test
@@ -182,21 +172,12 @@ public class NFAStateAccessTest {
 			}
 		});
 
-		NFA<Event> nfa = compile(pattern, false);
-
 		TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
-					nfa.process(
-					accessor,
-					nfa.createInitialNFAState(),
-					inputEvent.getValue(),
-					inputEvent.getTimestamp());
-			}
-		}
+		NFATestHarness nfaTestHarness = NFATestHarness.forPattern(pattern).withSharedBuffer(sharedBuffer).build();
+		nfaTestHarness.consumeRecords(inputEvents);
 
-		assertEquals(8, sharedBuffer.getStateReads());
-		assertEquals(12, sharedBuffer.getStateWrites());
-		assertEquals(20, sharedBuffer.getStateAccesses());
+		assertEquals(90, sharedBuffer.getStateReads());
+		assertEquals(31, sharedBuffer.getStateWrites());
+		assertEquals(121, sharedBuffer.getStateAccesses());
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index 8cbfba1..d0a7853 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.nfa;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
@@ -47,6 +48,7 @@ public class NFAStatusChangeITCase {
 
 	private SharedBuffer<Event> sharedBuffer;
 	private SharedBufferAccessor<Event> sharedBufferAccessor;
+	private AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
 
 	@Before
 	public void init() {
@@ -95,32 +97,32 @@ public class NFAStatusChangeITCase {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy);
 		assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy);
 		assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
 
 		// the status of the queue of ComputationStatus changed,
 		// more than one ComputationStatus is generated by the event from some ComputationStatus
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy);
 		assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy);
 		assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
@@ -164,10 +166,10 @@ public class NFAStatusChangeITCase {
 		NFAState nfaState = nfa.createInitialNFAState();
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy);
 		assertTrue(nfaState.isStateChanged());
 	}
 
@@ -193,7 +195,7 @@ public class NFAStatusChangeITCase {
 
 		nfaState.resetStateChanged();
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 6L);
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
 
 		nfaState.resetStateChanged();
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 17L);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index e5f4c9e..638b8b5 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -19,13 +19,11 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
-import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -39,10 +37,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
@@ -91,9 +87,7 @@ public class NFATest extends TestLogger {
 		states.add(endState);
 		states.add(endingState);
 
-		NFA<Event> nfa = new NFA<>(states, 0, false);
-
-		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+		List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
 
 		Map<String, List<Event>> firstPattern = new HashMap<>();
 		firstPattern.put("start", Collections.singletonList(new Event(1, "start", 1.0)));
@@ -106,14 +100,16 @@ public class NFATest extends TestLogger {
 		expectedPatterns.add(firstPattern);
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+		NFA<Event> nfa = new NFA<>(states, 0, false);
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+
+		Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
 
 	@Test
 	public void testTimeoutWindowPruning() throws Exception {
-		NFA<Event> nfa = createStartEndNFA();
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
@@ -121,7 +117,7 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "start", 3.0), 3L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 4L));
 
-		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+		List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
 
 		Map<String, List<Event>> secondPattern = new HashMap<>();
 		secondPattern.put("start", Collections.singletonList(new Event(3, "start", 3.0)));
@@ -129,7 +125,10 @@ public class NFATest extends TestLogger {
 
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+		NFA<Event> nfa = createStartEndNFA();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+
+		Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -140,15 +139,17 @@ public class NFATest extends TestLogger {
 	 */
 	@Test
 	public void testWindowBorders() throws Exception {
-		NFA<Event> nfa = createStartEndNFA();
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
 		streamEvents.add(new StreamRecord<>(new Event(2, "end", 2.0), 3L));
 
-		Set<Map<String, List<Event>>> expectedPatterns = Collections.emptySet();
+		List<Map<String, List<Event>>> expectedPatterns = Collections.emptyList();
+
+		NFA<Event> nfa = createStartEndNFA();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+		Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
 
 		assertEquals(expectedPatterns, actualPatterns);
 	}
@@ -159,7 +160,6 @@ public class NFATest extends TestLogger {
 	 */
 	@Test
 	public void testTimeoutWindowPruningWindowBorders() throws Exception {
-		NFA<Event> nfa = createStartEndNFA();
 		List<StreamRecord<Event>> streamEvents = new ArrayList<>();
 
 		streamEvents.add(new StreamRecord<>(new Event(1, "start", 1.0), 1L));
@@ -167,7 +167,7 @@ public class NFATest extends TestLogger {
 		streamEvents.add(new StreamRecord<>(new Event(3, "foobar", 3.0), 3L));
 		streamEvents.add(new StreamRecord<>(new Event(4, "end", 4.0), 3L));
 
-		Set<Map<String, List<Event>>> expectedPatterns = new HashSet<>();
+		List<Map<String, List<Event>>> expectedPatterns = new ArrayList<>();
 
 		Map<String, List<Event>> secondPattern = new HashMap<>();
 		secondPattern.put("start", Collections.singletonList(new Event(2, "start", 2.0)));
@@ -175,30 +175,12 @@ public class NFATest extends TestLogger {
 
 		expectedPatterns.add(secondPattern);
 
-		Collection<Map<String, List<Event>>> actualPatterns = runNFA(nfa, nfa.createInitialNFAState(), streamEvents);
+		NFA<Event> nfa = createStartEndNFA();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
 
-		assertEquals(expectedPatterns, actualPatterns);
-	}
+		Collection<Map<String, List<Event>>> actualPatterns = nfaTestHarness.consumeRecords(streamEvents);
 
-	public Collection<Map<String, List<Event>>> runNFA(
-		NFA<Event> nfa, NFAState nfaState, List<StreamRecord<Event>> inputs) throws Exception {
-		Set<Map<String, List<Event>>> actualPatterns = new HashSet<>();
-
-		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-			for (StreamRecord<Event> streamEvent : inputs) {
-				nfa.advanceTime(sharedBufferAccessor, nfaState, streamEvent.getTimestamp());
-				Collection<Map<String, List<Event>>> matchedPatterns = nfa.process(
-					sharedBufferAccessor,
-					nfaState,
-					streamEvent.getValue(),
-					streamEvent.getTimestamp());
-
-				actualPatterns.addAll(matchedPatterns);
-			}
-		}
-
-		return actualPatterns;
+		assertEquals(expectedPatterns, actualPatterns);
 	}
 
 	@Test
@@ -289,51 +271,49 @@ public class NFATest extends TestLogger {
 		patterns.add(pattern2);
 		patterns.add(pattern3);
 
-		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-
-			for (Pattern<Event, ?> p : patterns) {
-				NFA<Event> nfa = compile(p, false);
-
-				Event a = new Event(40, "a", 1.0);
-				Event b = new Event(41, "b", 2.0);
-				Event c = new Event(42, "c", 3.0);
-				Event b1 = new Event(41, "b", 3.0);
-				Event b2 = new Event(41, "b", 4.0);
-				Event b3 = new Event(41, "b", 5.0);
-				Event d = new Event(43, "d", 4.0);
-
-				NFAState nfaState = nfa.createInitialNFAState();
-
-				nfa.process(sharedBufferAccessor, nfaState, a, 1);
-				nfa.process(sharedBufferAccessor, nfaState, b, 2);
-				nfa.process(sharedBufferAccessor, nfaState, c, 3);
-				nfa.process(sharedBufferAccessor, nfaState, b1, 4);
-				nfa.process(sharedBufferAccessor, nfaState, b2, 5);
-				nfa.process(sharedBufferAccessor, nfaState, b3, 6);
-				nfa.process(sharedBufferAccessor, nfaState, d, 7);
-				nfa.process(sharedBufferAccessor, nfaState, a, 8);
-
-				NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
-
-				//serialize
-				ByteArrayOutputStream baos = new ByteArrayOutputStream();
-				serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
-				baos.close();
-
-				// copy
-				ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
-				ByteArrayOutputStream out = new ByteArrayOutputStream();
-				serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
-				in.close();
-				out.close();
-
-				// deserialize
-				ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
-				NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
-				bais.close();
-				assertEquals(nfaState, copy);
-			}
+		for (Pattern<Event, ?> p : patterns) {
+			NFA<Event> nfa = compile(p, false);
+
+			Event a = new Event(40, "a", 1.0);
+			Event b = new Event(41, "b", 2.0);
+			Event c = new Event(42, "c", 3.0);
+			Event b1 = new Event(41, "b", 3.0);
+			Event b2 = new Event(41, "b", 4.0);
+			Event b3 = new Event(41, "b", 5.0);
+			Event d = new Event(43, "d", 4.0);
+
+			NFAState nfaState = nfa.createInitialNFAState();
+
+			NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
+
+			nfaTestHarness.consumeRecord(new StreamRecord<>(a, 1));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(b, 2));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(c, 3));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(b1, 4));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(b2, 5));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(b3, 6));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(d, 7));
+			nfaTestHarness.consumeRecord(new StreamRecord<>(a, 8));
+
+			NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
+
+			//serialize
+			ByteArrayOutputStream baos = new ByteArrayOutputStream();
+			serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
+			baos.close();
+
+			// copy
+			ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
+			ByteArrayOutputStream out = new ByteArrayOutputStream();
+			serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
+			in.close();
+			out.close();
+
+			// deserialize
+			ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
+			NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
+			bais.close();
+			assertEquals(nfaState, copy);
 		}
 	}
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
index 11a8484..6f33218 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
index b7f1177..3d277af 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
@@ -33,8 +34,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
@@ -141,8 +142,9 @@ public void testClearingBuffer() throws Exception {
 	NFA<Event> nfa = compile(pattern, false);
 
 	NFAState nfaState = nfa.createInitialNFAState();
+	NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+	List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 		Lists.newArrayList(a1, b1, c1, d)
 	));
@@ -186,8 +188,9 @@ public void testClearingBufferWithUntilAtTheEnd() throws Exception {
 	NFA<Event> nfa = compile(pattern, false);
 
 	NFAState nfaState = nfa.createInitialNFAState();
+	NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-	List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+	List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 	compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 		Lists.newArrayList(a1, d1, d2, d3),
 		Lists.newArrayList(a1, d1, d2),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
index 049c84b..93d8c56 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesOrMoreITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
index 203a1c2..dd85d87 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/TimesRangeITCase.java
@@ -31,8 +31,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 
 /**
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
index be0f28b..39b23b4 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.cep.Event;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -31,8 +32,8 @@ import org.junit.Test;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
-import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.utils.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
 
@@ -92,8 +93,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -142,8 +144,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -193,8 +196,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -244,8 +248,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, breaking)
@@ -292,8 +297,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -342,8 +348,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
@@ -394,8 +401,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
@@ -525,8 +533,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -579,8 +588,9 @@ public class UntilConditionITCase {
 		NFA<Event> nfa = compile(pattern, false);
 
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
@@ -631,10 +641,10 @@ public class UntilConditionITCase {
 		});
 
 		NFA<Event> nfa = compile(pattern, false);
-
 		NFAState nfaState = nfa.createInitialNFAState();
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).withNFAState(nfaState).build();
 
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa, nfaState);
+		final List<List<Event>> resultingPatterns = nfaTestHarness.feedRecords(inputEvents);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
new file mode 100644
index 0000000..dd080fb
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.NFAState;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test harness for setting up {@link NFA}.
+ */
+public final class NFATestHarness {
+
+	private final SharedBuffer<Event> sharedBuffer;
+	private final NFA<Event> nfa;
+	private final NFAState nfaState;
+	private final AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+	private NFATestHarness(
+			SharedBuffer<Event> sharedBuffer,
+			NFA<Event> nfa,
+			NFAState nfaState,
+			AfterMatchSkipStrategy afterMatchSkipStrategy) {
+		this.sharedBuffer = sharedBuffer;
+		this.nfa = nfa;
+		this.nfaState = nfaState;
+		this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+	}
+
+	/**
+	 * Constructs a test harness starting from a given {@link Pattern}.
+	 */
+	public static NFATestHarnessBuilderPattern forPattern(Pattern<Event, ?> pattern) {
+		return new NFATestHarnessBuilderPattern(pattern);
+	}
+
+	/**
+	 * Constructs a test harness starting from a given {@link NFA}.
+	 */
+	public static NFATestHarnessBuilderNFA forNFA(NFA<Event> nfa) {
+		return new NFATestHarnessBuilderNFA(nfa);
+	}
+
+	public List<List<Event>> feedRecords(List<StreamRecord<Event>> inputEvents) throws Exception {
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			resultingPatterns.addAll(feedRecord(inputEvent));
+		}
+		return resultingPatterns;
+	}
+
+	public List<List<Event>> feedRecord(StreamRecord<Event> inputEvent) throws Exception {
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+		Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
+		for (Map<String, List<Event>> p : matches) {
+			List<Event> res = new ArrayList<>();
+			for (List<Event> le : p.values()) {
+				res.addAll(le);
+			}
+			resultingPatterns.add(res);
+		}
+		return resultingPatterns;
+	}
+
+	public Collection<Map<String, List<Event>>> consumeRecords(Collection<StreamRecord<Event>> inputEvents) throws Exception {
+		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			resultingPatterns.addAll(consumeRecord(inputEvent));
+		}
+
+		return resultingPatterns;
+	}
+
+	public Collection<Map<String, List<Event>>> consumeRecord(StreamRecord<Event> inputEvent) throws Exception {
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+			nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
+			return nfa.process(
+				sharedBufferAccessor,
+				nfaState,
+				inputEvent.getValue(),
+				inputEvent.getTimestamp(),
+				afterMatchSkipStrategy);
+		}
+	}
+
+	/**
+	 * Builder for {@link NFATestHarness} that encapsulates {@link Pattern}.
+	 */
+	public static class NFATestHarnessBuilderPattern extends NFATestHarnessBuilderBase {
+
+		private final Pattern<Event, ?> pattern;
+		private boolean timeoutHandling = false;
+
+		NFATestHarnessBuilderPattern(Pattern<Event, ?> pattern) {
+			super(pattern.getAfterMatchSkipStrategy());
+			this.pattern = pattern;
+		}
+
+		public NFATestHarnessBuilderBase withTimeoutHandling() {
+			this.timeoutHandling = true;
+			return this;
+		}
+
+		@Override
+		public NFATestHarness build() {
+			NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
+			return new NFATestHarness(
+				sharedBuffer,
+				nfa,
+				nfa.createInitialNFAState(),
+				afterMatchSkipStrategy);
+		}
+	}
+
+	/**
+	 * Builder for {@link NFATestHarness} that encapsulates {@link NFA}.
+	 */
+	public static class NFATestHarnessBuilderNFA extends NFATestHarnessBuilderBase {
+
+		private final NFA<Event> nfa;
+		private NFAState nfaState;
+
+		NFATestHarnessBuilderNFA(NFA<Event> nfa) {
+			super(AfterMatchSkipStrategy.noSkip());
+			this.nfa = nfa;
+			this.nfaState = nfa.createInitialNFAState();
+		}
+
+		public NFATestHarnessBuilderBase withNFAState(NFAState nfaState) {
+			this.nfaState = nfaState;
+			return this;
+		}
+
+		@Override
+		public NFATestHarness build() {
+			return new NFATestHarness(sharedBuffer, nfa, nfaState, afterMatchSkipStrategy);
+		}
+	}
+
+	/**
+	 * Common builder, which can be used independent if we start with {@link Pattern} or {@link NFA}.
+	 * Enables to provide custom services like {@link SharedBuffer} etc.
+	 */
+	public abstract static class NFATestHarnessBuilderBase {
+
+		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+		NFATestHarnessBuilderBase(AfterMatchSkipStrategy skipStrategy) {
+			this.afterMatchSkipStrategy = skipStrategy;
+		}
+
+		public NFATestHarnessBuilderBase withSharedBuffer(SharedBuffer<Event> sharedBuffer) {
+			this.sharedBuffer = sharedBuffer;
+			return this;
+		}
+
+		public NFATestHarnessBuilderBase withAfterMatchSkipStrategy(AfterMatchSkipStrategy afterMatchSkipStrategy) {
+			this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+			return this;
+		}
+
+		public abstract NFATestHarness build();
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
similarity index 57%
rename from flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
index 91e490e..ac45798 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestUtilities.java
@@ -16,77 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep.nfa;
+package org.apache.flink.cep.utils;
 
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
-import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import org.junit.Assert;
 
-import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
-import java.util.Map;
 
 /**
  * Base method for IT tests of {@link NFA}. It provides utility methods.
  */
 public class NFATestUtilities {
 
-	public static List<List<Event>> feedNFA(
-		List<StreamRecord<Event>> inputEvents,
-		NFA<Event> nfa) throws Exception {
-		return feedNFA(inputEvents, nfa, nfa.createInitialNFAState(), AfterMatchSkipStrategy.noSkip());
-	}
-
-	public static List<List<Event>> feedNFA(
-			List<StreamRecord<Event>> inputEvents,
-			NFA<Event> nfa,
-			NFAState nfaState) throws Exception {
-		return feedNFA(inputEvents, nfa, nfaState, AfterMatchSkipStrategy.noSkip());
-	}
-
-	public static List<List<Event>> feedNFA(
-		List<StreamRecord<Event>> inputEvents,
-		NFA<Event> nfa,
-		AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
-		return feedNFA(inputEvents, nfa, nfa.createInitialNFAState(), afterMatchSkipStrategy);
-	}
-
+	@Deprecated
 	public static List<List<Event>> feedNFA(
 			List<StreamRecord<Event>> inputEvents,
-			NFA<Event> nfa,
-			NFAState nfaState,
-			AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-
-		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
-				nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
-				Collection<Map<String, List<Event>>> patterns = nfa.process(
-					sharedBufferAccessor,
-					nfaState,
-					inputEvent.getValue(),
-					inputEvent.getTimestamp(),
-					afterMatchSkipStrategy);
-				for (Map<String, List<Event>> p: patterns) {
-					List<Event> res = new ArrayList<>();
-					for (List<Event> le: p.values()) {
-						res.addAll(le);
-					}
-					resultingPatterns.add(res);
-				}
-			}
-		}
-		return resultingPatterns;
+			NFA<Event> nfa) throws Exception {
+		NFATestHarness nfaTestHarness = NFATestHarness.forNFA(nfa).build();
+		return nfaTestHarness.feedRecords(inputEvents);
 	}
 
 	public static void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {