You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2019/01/09 08:13:11 UTC

[flink] 03/05: [FLINK-10596][cep] Added access to timer service in IterativeCondition

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 60f8826de75bdb1294ca4cd26b2af155de62c0df
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 12 09:17:40 2018 +0100

    [FLINK-10596][cep] Added access to timer service in IterativeCondition
---
 ...ernStreamScalaJavaAPIInteroperabilityTest.scala |  2 +-
 .../cep/functions/PatternProcessFunction.java      |  2 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java    | 84 ++++++++++----------
 .../org/apache/flink/cep/operator/CepOperator.java | 20 ++++-
 .../cep/pattern/conditions/IterativeCondition.java |  5 +-
 .../flink/cep/{context => time}/TimerContext.java  |  6 +-
 .../TimerContext.java => time/TimerService.java}   | 22 ++----
 .../java/org/apache/flink/cep/nfa/NFAITCase.java   | 13 +++-
 .../nfa/NFAIterativeConditionTimerContextTest.java | 91 ++++++++++++++++++++++
 .../flink/cep/nfa/NFAStatusChangeITCase.java       | 21 ++---
 .../apache/flink/cep/operator/CEPOperatorTest.java | 35 +++++++++
 .../org/apache/flink/cep/utils/NFATestHarness.java | 35 ++++++---
 .../apache/flink/cep/utils/TestTimerService.java}  | 36 ++++-----
 13 files changed, 266 insertions(+), 106 deletions(-)

diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
index 5f6055d..80d96ac 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -150,7 +150,7 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
       outputs.getOrElseUpdate(outputTag, ListBuffer.empty).append(value)
     }
 
-    override def timestamp(): lang.Long = null
+    override def timestamp(): Long = 0
 
     override def currentProcessingTime(): Long = System.currentTimeMillis()
   }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
index b392501..b8e6821 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
@@ -20,7 +20,7 @@ package org.apache.flink.cep.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.cep.context.TimerContext;
+import org.apache.flink.cep.time.TimerContext;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 48bb587..869a641 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -38,6 +38,7 @@ import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -211,32 +212,8 @@ public class NFA<T> {
 	 * @param nfaState The NFAState object that we need to affect while processing
 	 * @param event The current event to be processed or null if only pruning shall be done
 	 * @param timestamp The timestamp of the current event
-	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
-	 * reached a final state) and the collection of timed out patterns (if timeout handling is
-	 * activated)
-	 * @throws Exception Thrown if the system cannot access the state.
-	 */
-	public Collection<Map<String, List<T>>> process(
-			final SharedBufferAccessor<T> sharedBufferAccessor,
-			final NFAState nfaState,
-			final T event,
-			final long timestamp) throws Exception {
-		return process(sharedBufferAccessor, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip());
-	}
-
-	/**
-	 * Processes the next input event. If some of the computations reach a final state then the
-	 * resulting event sequences are returned. If computations time out and timeout handling is
-	 * activated, then the timed out event patterns are returned.
-	 *
-	 * <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
-	 * with the element that resulted in the stop state.
-	 *
-	 * @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
-	 * @param nfaState The NFAState object that we need to affect while processing
-	 * @param event The current event to be processed or null if only pruning shall be done
-	 * @param timestamp The timestamp of the current event
 	 * @param afterMatchSkipStrategy The skip strategy to use after per match
+	 * @param timerService gives access to processing time and time characteristic, needed for condition evaluation
 	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
 	 * reached a final state) and the collection of timed out patterns (if timeout handling is
 	 * activated)
@@ -247,9 +224,10 @@ public class NFA<T> {
 			final NFAState nfaState,
 			final T event,
 			final long timestamp,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+			final AfterMatchSkipStrategy afterMatchSkipStrategy,
+			final TimerService timerService) throws Exception {
 		try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) {
-			return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy);
+			return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy, timerService);
 		}
 	}
 
@@ -305,7 +283,8 @@ public class NFA<T> {
 			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final NFAState nfaState,
 			final EventWrapper event,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+			final AfterMatchSkipStrategy afterMatchSkipStrategy,
+			final TimerService timerService) throws Exception {
 
 		final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 		final PriorityQueue<ComputationState> potentialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
@@ -316,7 +295,7 @@ public class NFA<T> {
 				sharedBufferAccessor,
 				computationState,
 				event,
-				event.getTimestamp());
+				timerService);
 
 			if (newComputationStates.size() != 1) {
 				nfaState.setStateChanged();
@@ -555,7 +534,7 @@ public class NFA<T> {
 	 * @param sharedBufferAccessor The accessor to shared buffer that we need to change
 	 * @param computationState Current computation state
 	 * @param event Current event which is processed
-	 * @param timestamp Timestamp of the current event
+	 * @param timerService timer service which provides access to time related features
 	 * @return Collection of computation states which result from the current one
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
@@ -563,9 +542,13 @@ public class NFA<T> {
 			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final ComputationState computationState,
 			final EventWrapper event,
-			final long timestamp) throws Exception {
+			final TimerService timerService) throws Exception {
 
-		final ConditionContext<T> context = new ConditionContext<>(this, sharedBufferAccessor, computationState);
+		final ConditionContext context = new ConditionContext(
+			sharedBufferAccessor,
+			computationState,
+			timerService,
+			event.getTimestamp());
 
 		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(context, computationState, event.getEvent());
 
@@ -628,7 +611,7 @@ public class NFA<T> {
 					final long startTimestamp;
 					final EventId startEventId;
 					if (isStartState(computationState)) {
-						startTimestamp = timestamp;
+						startTimestamp = event.getTimestamp();
 						startEventId = event.getEventId();
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
@@ -694,7 +677,7 @@ public class NFA<T> {
 	}
 
 	private State<T> findFinalStateAfterProceed(
-			ConditionContext<T> context,
+			ConditionContext context,
 			State<T> state,
 			T event) {
 		final Stack<State<T>> statesToCheck = new Stack<>();
@@ -725,7 +708,7 @@ public class NFA<T> {
 	}
 
 	private OutgoingEdges<T> createDecisionGraph(
-			ConditionContext<T> context,
+			ConditionContext context,
 			ComputationState computationState,
 			T event) {
 		State<T> state = getState(computationState);
@@ -765,7 +748,7 @@ public class NFA<T> {
 	}
 
 	private boolean checkFilterCondition(
-			ConditionContext<T> context,
+			ConditionContext context,
 			IterativeCondition<T> condition,
 			T event) throws Exception {
 		return condition == null || condition.filter(event, context);
@@ -804,7 +787,11 @@ public class NFA<T> {
 	/**
 	 * The context used when evaluating this computation state.
 	 */
-	private static class ConditionContext<T> implements IterativeCondition.Context<T> {
+	private class ConditionContext implements IterativeCondition.Context<T> {
+
+		private final TimerService timerService;
+
+		private final long eventTimestamp;
 
 		/** The current computation state. */
 		private ComputationState computationState;
@@ -816,17 +803,17 @@ public class NFA<T> {
 		 */
 		private Map<String, List<T>> matchedEvents;
 
-		private NFA<T> nfa;
-
 		private SharedBufferAccessor<T> sharedBufferAccessor;
 
 		ConditionContext(
-				final NFA<T> nfa,
 				final SharedBufferAccessor<T> sharedBufferAccessor,
-				final ComputationState computationState) {
+				final ComputationState computationState,
+				final TimerService timerService,
+				final long eventTimestamp) {
 			this.computationState = computationState;
-			this.nfa = nfa;
 			this.sharedBufferAccessor = sharedBufferAccessor;
+			this.timerService = timerService;
+			this.eventTimestamp = eventTimestamp;
 		}
 
 		@Override
@@ -837,7 +824,8 @@ public class NFA<T> {
 			// this is to avoid any overheads when using a simple, non-iterative condition.
 
 			if (matchedEvents == null) {
-				this.matchedEvents = sharedBufferAccessor.materializeMatch(nfa.extractCurrentMatches(sharedBufferAccessor,
+				this.matchedEvents = sharedBufferAccessor.materializeMatch(extractCurrentMatches(
+					sharedBufferAccessor,
 					computationState));
 			}
 
@@ -851,6 +839,16 @@ public class NFA<T> {
 				}
 			};
 		}
+
+		@Override
+		public long timestamp() {
+			return eventTimestamp;
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return timerService.currentProcessingTime();
+		}
 	}
 
 	////////////////////				DEPRECATED/MIGRATION UTILS
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 262d89d..357fbc7 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -40,6 +40,7 @@ import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -130,6 +131,9 @@ public class CepOperator<IN, KEY, OUT>
 	/** Wrapped RuntimeContext that limits the underlying context features. */
 	private transient CepRuntimeContext cepRuntimeContext;
 
+	/** Thin context passed to NFA that gives access to time related characteristics. */
+	private transient TimerService cepTimerService;
+
 	public CepOperator(
 			final TypeSerializer<IN> inputSerializer,
 			final boolean isProcessingTime,
@@ -216,6 +220,7 @@ public class CepOperator<IN, KEY, OUT>
 
 		context = new ContextFunctionImpl();
 		collector = new TimestampedCollector<>(output);
+		cepTimerService = new TimerServiceImpl();
 	}
 
 	@Override
@@ -415,7 +420,7 @@ public class CepOperator<IN, KEY, OUT>
 	private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
 		try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
 			Collection<Map<String, List<IN>>> patterns =
-				nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy);
+				nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy, cepTimerService);
 			processMatchedSequences(patterns, timestamp);
 		}
 	}
@@ -465,6 +470,19 @@ public class CepOperator<IN, KEY, OUT>
 	}
 
 	/**
+	 * Gives {@link NFA} access to {@link InternalTimerService} and tells if {@link CepOperator} works in
+	 * processing time. Should be instantiated once per operator.
+	 */
+	private class TimerServiceImpl implements TimerService {
+
+		@Override
+		public long currentProcessingTime() {
+			return timerService.currentProcessingTime();
+		}
+
+	}
+
+	/**
 	 * Implementation of {@link PatternProcessFunction.Context}. Design to be instantiated once per operator.
 	 * It serves three methods:
 	 *  <ul>
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
index f716ec4..786cb08 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -18,7 +18,9 @@
 
 package org.apache.flink.cep.pattern.conditions;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.cep.time.TimerContext;
 
 import java.io.Serializable;
 
@@ -59,6 +61,7 @@ import java.io.Serializable;
  * the elements that belong to the pattern among the elements stored by the NFA. The cost of this operation can vary,
  * so when implementing your condition, try to minimize the times the method is called.
  */
+@PublicEvolving
 public abstract class IterativeCondition<T> implements Function, Serializable {
 
 	private static final long serialVersionUID = 7067817235759351255L;
@@ -84,7 +87,7 @@ public abstract class IterativeCondition<T> implements Function, Serializable {
 	/**
 	 * The context used when evaluating the {@link IterativeCondition condition}.
 	 */
-	public interface Context<T> {
+	public interface Context<T> extends TimerContext {
 
 		/**
 		 * @return An {@link Iterable} over the already accepted elements
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
similarity index 91%
copy from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
copy to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
index 4f0eab2..9b5db52 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerContext.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.time;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.cep.functions.PatternProcessFunction;
@@ -32,8 +32,8 @@ public interface TimerContext {
 	/**
 	 * Timestamp of the element currently being processed.
 	 *
-	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
-	 * time when event entered the cep operator.
+	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this means the
+	 * time when the event entered the cep operator.
 	 */
 	long timestamp();
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
similarity index 58%
copy from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
copy to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
index 4f0eab2..464e267 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/time/TimerService.java
@@ -16,28 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.time;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.annotation.Internal;
 
 /**
- * Enables access to time related characteristics such as current processing time or timestamp of currently processed
- * element. Used in {@link PatternProcessFunction} and
- * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ * Enables to provide time characteristic to {@link org.apache.flink.cep.nfa.NFA} for use in
+ * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}.
  */
-@PublicEvolving
-public interface TimerContext {
+@Internal
+public interface TimerService {
 
 	/**
-	 * Timestamp of the element currently being processed.
-	 *
-	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
-	 * time when event entered the cep operator.
+	 * Current processing time as returned from {@link org.apache.flink.streaming.api.TimerService}.
 	 */
-	long timestamp();
-
-	/** Returns the current processing time. */
 	long currentProcessingTime();
 
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 62d76db..4b1f627 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.utils.NFATestHarness;
 import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.TestTimerService;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
@@ -420,7 +421,8 @@ public class NFAITCase extends TestLogger {
 					nfaState,
 					event.getValue(),
 					event.getTimestamp(),
-					AfterMatchSkipStrategy.noSkip());
+					AfterMatchSkipStrategy.noSkip(),
+					new TestTimerService());
 
 			resultingPatterns.addAll(matchedPatterns);
 			resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2830,10 +2832,13 @@ public class NFAITCase extends TestLogger {
 		Event a = new Event(40, "a", 1.0);
 		Event b = new Event(41, "b", 2.0);
 
+		NFA<Event> nfa = compile(pattern, false);
+		TestTimerService timerService = new TestTimerService();
 		try (SharedBufferAccessor<Event> accessor = Mockito.spy(sharedBuffer.getAccessor())) {
-			NFA<Event> nfa = compile(pattern, false);
-			nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip());
-			nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip());
+			nfa.process(accessor, nfa.createInitialNFAState(), a, 1, AfterMatchSkipStrategy.noSkip(),
+				timerService);
+			nfa.process(accessor, nfa.createInitialNFAState(), b, 2, AfterMatchSkipStrategy.noSkip(),
+				timerService);
 			Mockito.verify(accessor, Mockito.never()).advanceTime(anyLong());
 			nfa.advanceTime(accessor, nfa.createInitialNFAState(), 2);
 			Mockito.verify(accessor, Mockito.times(1)).advanceTime(2);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java
new file mode 100644
index 0000000..3f9b26a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAIterativeConditionTimerContextTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.utils.NFATestHarness;
+import org.apache.flink.cep.utils.TestTimerService;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.flink.cep.utils.EventBuilder.event;
+import static org.apache.flink.cep.utils.NFATestHarness.forPattern;
+import static org.apache.flink.cep.utils.NFATestUtilities.compareMaps;
+
+/**
+ * Tests for accesing time properties from {@link IterativeCondition}.
+ */
+public class NFAIterativeConditionTimerContextTest extends TestLogger {
+
+	@Test
+	public void testEventTimestamp() throws Exception {
+		final Event event = event().withId(1).build();
+		final long timestamp = 3;
+
+		final Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				return ctx.timestamp() == timestamp;
+			}
+		});
+
+		final NFATestHarness testHarness = forPattern(pattern).build();
+
+		final List<List<Event>> resultingPattern = testHarness.feedRecord(new StreamRecord<>(event, timestamp));
+
+		compareMaps(resultingPattern, Collections.singletonList(
+			Collections.singletonList(event)
+		));
+	}
+
+	@Test
+	public void testCurrentProcessingTime() throws Exception {
+		final Event event1 = event().withId(1).build();
+		final Event event2 = event().withId(2).build();
+
+		final Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new IterativeCondition<Event>() {
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				return ctx.currentProcessingTime() == 3;
+			}
+		});
+
+		final TestTimerService cepTimerService = new TestTimerService();
+		final NFATestHarness testHarness = forPattern(pattern)
+			.withTimerService(cepTimerService)
+			.build();
+
+		cepTimerService.setCurrentProcessingTime(1);
+		final List<List<Event>> resultingPatterns1 = testHarness.feedRecord(new StreamRecord<>(event1, 7));
+		cepTimerService.setCurrentProcessingTime(3);
+		final List<List<Event>> resultingPatterns2 = testHarness.feedRecord(new StreamRecord<>(event2, 8));
+
+		compareMaps(resultingPatterns1, Collections.emptyList());
+		compareMaps(resultingPatterns2, Collections.singletonList(
+			Collections.singletonList(event2)
+		));
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index d0a7853..c5e6682 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -26,7 +26,9 @@ import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.cep.utils.TestSharedBuffer;
+import org.apache.flink.cep.utils.TestTimerService;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
 import org.junit.After;
@@ -49,6 +51,7 @@ public class NFAStatusChangeITCase {
 	private SharedBuffer<Event> sharedBuffer;
 	private SharedBufferAccessor<Event> sharedBufferAccessor;
 	private AfterMatchSkipStrategy skipStrategy = AfterMatchSkipStrategy.noSkip();
+	private TimerService timerService = new TestTimerService();
 
 	@Before
 	public void init() {
@@ -97,32 +100,32 @@ public class NFAStatusChangeITCase {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L, skipStrategy, timerService);
 		assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L, skipStrategy, timerService);
 		assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
 
 		// the status of the queue of ComputationStatus changed,
 		// more than one ComputationStatus is generated by the event from some ComputationStatus
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L, skipStrategy, timerService);
 		assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L, skipStrategy, timerService);
 		assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L, skipStrategy, timerService);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L, skipStrategy, timerService);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
@@ -166,10 +169,10 @@ public class NFAStatusChangeITCase {
 		NFAState nfaState = nfa.createInitialNFAState();
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy, timerService);
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L, skipStrategy, timerService);
 		assertTrue(nfaState.isStateChanged());
 	}
 
@@ -195,7 +198,7 @@ public class NFAStatusChangeITCase {
 
 		nfaState.resetStateChanged();
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 6L);
-		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L, skipStrategy, timerService);
 
 		nfaState.resetStateChanged();
 		nfa.advanceTime(sharedBufferAccessor, nfaState, 17L);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 2e4b41d..f496dc8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -30,10 +30,14 @@ import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.functions.PatternProcessFunction;
 import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.NFAState;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.mock.Whitebox;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
@@ -64,10 +68,16 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
+import static org.apache.flink.cep.utils.CepOperatorBuilder.createOperatorForNFA;
+import static org.apache.flink.cep.utils.EventBuilder.event;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.validateMockitoUsage;
+import static org.mockito.Mockito.verify;
 
 /**
  * Tests for {@link CepOperator}.
@@ -101,6 +111,31 @@ public class CEPOperatorTest extends TestLogger {
 	}
 
 	@Test
+	public void testProcessingTimestampisPassedToNFA() throws Exception {
+
+		final NFA<Event> nfa = NFACompiler.compileFactory(Pattern.<Event>begin("begin"), true).createNFA();
+		final NFA<Event> spyNFA = spy(nfa);
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness =
+				CepOperatorTestUtilities.getCepTestHarness(createOperatorForNFA(spyNFA).build())) {
+
+			long timestamp = 5;
+			harness.open();
+			harness.setProcessingTime(timestamp);
+			StreamRecord<Event> event = event().withTimestamp(3).asStreamRecord();
+			harness.processElement(event);
+			verify(spyNFA).process(
+				any(SharedBufferAccessor.class),
+				any(NFAState.class),
+				eq(event.getValue()),
+				eq(timestamp),
+				any(AfterMatchSkipStrategy.class),
+				any(TimerService.class));
+		}
+	}
+
+	@Test
 	public void testKeyedCEPOperatorCheckpointing() throws Exception {
 
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(false);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
index dd080fb..f8e0f57 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/NFATestHarness.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.time.TimerService;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.ArrayList;
@@ -41,16 +42,19 @@ public final class NFATestHarness {
 	private final NFA<Event> nfa;
 	private final NFAState nfaState;
 	private final AfterMatchSkipStrategy afterMatchSkipStrategy;
+	private final TimerService timerService;
 
 	private NFATestHarness(
 			SharedBuffer<Event> sharedBuffer,
 			NFA<Event> nfa,
 			NFAState nfaState,
-			AfterMatchSkipStrategy afterMatchSkipStrategy) {
+			AfterMatchSkipStrategy afterMatchSkipStrategy,
+			TimerService timerService) {
 		this.sharedBuffer = sharedBuffer;
 		this.nfa = nfa;
 		this.nfaState = nfaState;
 		this.afterMatchSkipStrategy = afterMatchSkipStrategy;
+		this.timerService = timerService;
 	}
 
 	/**
@@ -68,7 +72,7 @@ public final class NFATestHarness {
 	}
 
 	public List<List<Event>> feedRecords(List<StreamRecord<Event>> inputEvents) throws Exception {
-		List<List<Event>> resultingPatterns = new ArrayList<>();
+		final List<List<Event>> resultingPatterns = new ArrayList<>();
 		for (StreamRecord<Event> inputEvent : inputEvents) {
 			resultingPatterns.addAll(feedRecord(inputEvent));
 		}
@@ -76,8 +80,8 @@ public final class NFATestHarness {
 	}
 
 	public List<List<Event>> feedRecord(StreamRecord<Event> inputEvent) throws Exception {
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-		Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
+		final List<List<Event>> resultingPatterns = new ArrayList<>();
+		final Collection<Map<String, List<Event>>> matches = consumeRecord(inputEvent);
 		for (Map<String, List<Event>> p : matches) {
 			List<Event> res = new ArrayList<>();
 			for (List<Event> le : p.values()) {
@@ -89,7 +93,7 @@ public final class NFATestHarness {
 	}
 
 	public Collection<Map<String, List<Event>>> consumeRecords(Collection<StreamRecord<Event>> inputEvents) throws Exception {
-		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+		final List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
 		for (StreamRecord<Event> inputEvent : inputEvents) {
 			resultingPatterns.addAll(consumeRecord(inputEvent));
 		}
@@ -105,7 +109,8 @@ public final class NFATestHarness {
 				nfaState,
 				inputEvent.getValue(),
 				inputEvent.getTimestamp(),
-				afterMatchSkipStrategy);
+				afterMatchSkipStrategy,
+				timerService);
 		}
 	}
 
@@ -129,12 +134,13 @@ public final class NFATestHarness {
 
 		@Override
 		public NFATestHarness build() {
-			NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
+			final NFA<Event> nfa = NFAUtils.compile(pattern, timeoutHandling);
 			return new NFATestHarness(
 				sharedBuffer,
 				nfa,
 				nfa.createInitialNFAState(),
-				afterMatchSkipStrategy);
+				afterMatchSkipStrategy,
+				timerService);
 		}
 	}
 
@@ -159,7 +165,12 @@ public final class NFATestHarness {
 
 		@Override
 		public NFATestHarness build() {
-			return new NFATestHarness(sharedBuffer, nfa, nfaState, afterMatchSkipStrategy);
+			return new NFATestHarness(
+				sharedBuffer,
+				nfa,
+				nfaState,
+				afterMatchSkipStrategy,
+				timerService);
 		}
 	}
 
@@ -171,6 +182,7 @@ public final class NFATestHarness {
 
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
 		AfterMatchSkipStrategy afterMatchSkipStrategy;
+		TimerService timerService = new TestTimerService();
 
 		NFATestHarnessBuilderBase(AfterMatchSkipStrategy skipStrategy) {
 			this.afterMatchSkipStrategy = skipStrategy;
@@ -186,6 +198,11 @@ public final class NFATestHarness {
 			return this;
 		}
 
+		public NFATestHarnessBuilderBase withTimerService(TimerService timerService) {
+			this.timerService = timerService;
+			return this;
+		}
+
 		public abstract NFATestHarness build();
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
similarity index 51%
rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
index 4f0eab2..5fd9755 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestTimerService.java
@@ -16,28 +16,26 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep.context;
+package org.apache.flink.cep.utils;
 
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.time.TimerService;
 
 /**
- * Enables access to time related characteristics such as current processing time or timestamp of currently processed
- * element. Used in {@link PatternProcessFunction} and
- * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ * Test implementation of {@link TimerService}. Provides setters for encapsulated properties.
  */
-@PublicEvolving
-public interface TimerContext {
-
-	/**
-	 * Timestamp of the element currently being processed.
-	 *
-	 * <p>In case of {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime} this will be set to the
-	 * time when event entered the cep operator.
-	 */
-	long timestamp();
-
-	/** Returns the current processing time. */
-	long currentProcessingTime();
+public class TestTimerService implements TimerService {
+
+	public static final long UNDEFINED_TIME = Long.MIN_VALUE;
+
+	private Long processingTime = UNDEFINED_TIME;
+
+	@Override
+	public long currentProcessingTime() {
+		return processingTime;
+	}
+
+	public void setCurrentProcessingTime(long processingTime) {
+		this.processingTime = processingTime;
+	}
 
 }