You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:26 UTC

[09/10] flink git commit: [FLINK-9418] Separated pruning and element processing paths

[FLINK-9418] Separated pruning and element processing paths

This closes #6059


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/05ee3ce9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/05ee3ce9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/05ee3ce9

Branch: refs/heads/master
Commit: 05ee3ce96ae51c5df9069564fd5cd2482a62de39
Parents: 9218df8
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Tue Jun 5 18:57:41 2018 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Wed Jun 13 14:59:59 2018 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 135 ++++++++++++-------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |   4 +-
 .../flink/cep/nfa/sharedbuffer/EventId.java     |  15 ++-
 .../flink/cep/nfa/sharedbuffer/Lockable.java    |   6 +-
 .../cep/nfa/sharedbuffer/SharedBuffer.java      |  87 ++++++------
 .../AbstractKeyedCEPPatternOperator.java        |  43 +++---
 .../org/apache/flink/cep/nfa/NFAITCase.java     |  40 ++++--
 .../flink/cep/nfa/NFAStatusChangeITCase.java    |   7 +-
 .../java/org/apache/flink/cep/nfa/NFATest.java  |   3 +-
 .../apache/flink/cep/nfa/NFATestUtilities.java  |   3 +-
 .../cep/nfa/sharedbuffer/SharedBufferTest.java  |  41 ++++--
 .../flink/cep/utils/TestSharedBuffer.java       |   6 +
 .../flink/runtime/state/KeyedStateFunction.java |   5 +-
 .../api/operators/AbstractStreamOperator.java   |   2 -
 14 files changed, 246 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 227a34d..041a017 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -105,8 +105,7 @@ public class NFA<T> {
 	public NFA(
 			final Collection<State<T>> validStates,
 			final long windowTime,
-			final boolean handleTimeout
-	) {
+			final boolean handleTimeout) {
 		this.windowTime = windowTime;
 		this.handleTimeout = handleTimeout;
 		this.states = loadStates(validStates);
@@ -169,7 +168,6 @@ public class NFA<T> {
 		return stateObject.isFinal();
 	}
 
-
 	/**
 	 * Processes the next input event. If some of the computations reach a final state then the
 	 * resulting event sequences are returned. If computations time out and timeout handling is
@@ -185,8 +183,9 @@ public class NFA<T> {
 	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
 	 * reached a final state) and the collection of timed out patterns (if timeout handling is
 	 * activated)
+	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(
+	public Collection<Map<String, List<T>>> process(
 			final SharedBuffer<T> sharedBuffer,
 			final NFAState nfaState,
 			final T event,
@@ -210,61 +209,91 @@ public class NFA<T> {
 	 * @return Tuple of the collection of matched patterns (e.g. the result of computations which have
 	 * reached a final state) and the collection of timed out patterns (if timeout handling is
 	 * activated)
+	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	public Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> process(
+	public Collection<Map<String, List<T>>> process(
 			final SharedBuffer<T> sharedBuffer,
 			final NFAState nfaState,
 			final T event,
 			final long timestamp,
 			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
-
 		try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBuffer)) {
 			return doProcess(sharedBuffer, nfaState, eventWrapper, afterMatchSkipStrategy);
 		}
 	}
 
-	private Tuple2<Collection<Map<String, List<T>>>, Collection<Tuple2<Map<String, List<T>>, Long>>> doProcess(
+	/**
+	 * Prunes states assuming there will be no events with timestamp <b>lower</b> than the given one.
+	 * It cleares the sharedBuffer and also emits all timed out partial matches.
+	 *
+	 * @param sharedBuffer the SharedBuffer object that we need to work upon while processing
+	 * @param nfaState     The NFAState object that we need to affect while processing
+	 * @param timestamp    timestamp that indicates that there will be no more events with lower timestamp
+	 * @return all timed outed partial matches
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public Collection<Tuple2<Map<String, List<T>>, Long>> advanceTime(
 			final SharedBuffer<T> sharedBuffer,
 			final NFAState nfaState,
-			final EventWrapper event,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+			final long timestamp) throws Exception {
 
 		Queue<ComputationState> computationStates = nfaState.getComputationStates();
-
-		final int numberComputationStates = computationStates.size();
-		final Collection<Map<String, List<T>>> result = new ArrayList<>();
 		final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
 
-		// iterate over all current computations
+		final int numberComputationStates = computationStates.size();
 		for (int i = 0; i < numberComputationStates; i++) {
 			ComputationState computationState = computationStates.poll();
 
-			final Collection<ComputationState> newComputationStates;
-
-			if (!isStartState(computationState) &&
-				windowTime > 0L &&
-				event.getTimestamp() - computationState.getStartTimestamp() >= windowTime) {
+			if (isStateTimedOut(computationState, timestamp)) {
 
 				if (handleTimeout) {
 					// extract the timed out event pattern
 					Map<String, List<T>> timedOutPattern = extractCurrentMatches(sharedBuffer, computationState);
-					timeoutResult.add(Tuple2.of(timedOutPattern, event.getTimestamp()));
+					timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
 				}
 
 				sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
 
-				newComputationStates = Collections.emptyList();
 				nfaState.setStateChanged();
-			} else if (event.getEvent() != null) {
-				newComputationStates = computeNextStates(sharedBuffer, computationState, event, event.getTimestamp());
-
-				if (newComputationStates.size() != 1) {
-					nfaState.setStateChanged();
-				} else if (!newComputationStates.iterator().next().equals(computationState)) {
-					nfaState.setStateChanged();
-				}
 			} else {
-				newComputationStates = Collections.singleton(computationState);
+				computationStates.add(computationState);
+			}
+		}
+
+		sharedBuffer.advanceTime(timestamp);
+
+		return timeoutResult;
+	}
+
+	private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
+		return !isStartState(state) && windowTime > 0L && timestamp - state.getStartTimestamp() >= windowTime;
+	}
+
+	private Collection<Map<String, List<T>>> doProcess(
+			final SharedBuffer<T> sharedBuffer,
+			final NFAState nfaState,
+			final EventWrapper event,
+			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
+
+		Queue<ComputationState> computationStates = nfaState.getComputationStates();
+
+		final int numberComputationStates = computationStates.size();
+		final Collection<Map<String, List<T>>> result = new ArrayList<>();
+
+		// iterate over all current computations
+		for (int i = 0; i < numberComputationStates; i++) {
+			ComputationState computationState = computationStates.poll();
+
+			final Collection<ComputationState> newComputationStates = computeNextStates(
+				sharedBuffer,
+				computationState,
+				event,
+				event.getTimestamp());
+
+			if (newComputationStates.size() != 1) {
+				nfaState.setStateChanged();
+			} else if (!newComputationStates.iterator().next().equals(computationState)) {
+				nfaState.setStateChanged();
 			}
 
 			//delay adding new computation states in case a stop state is reached and we discard the path.
@@ -299,13 +328,12 @@ public class NFA<T> {
 			} else {
 				computationStates.addAll(statesToRetain);
 			}
-
 		}
 
 		discardComputationStatesAccordingToStrategy(
 			sharedBuffer, computationStates, result, afterMatchSkipStrategy);
 
-		return Tuple2.of(result, timeoutResult);
+		return result;
 	}
 
 	private void discardComputationStatesAccordingToStrategy(
@@ -500,6 +528,7 @@ public class NFA<T> {
 	 * @param event Current event which is processed
 	 * @param timestamp Timestamp of the current event
 	 * @return Collection of computation states which result from the current one
+	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	private Collection<ComputationState> computeNextStates(
 			final SharedBuffer<T> sharedBuffer,
@@ -558,22 +587,17 @@ public class NFA<T> {
 					final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage();
 					takeBranchesToVisit--;
 
-					final NodeId newEntry;
+					final NodeId newEntry = sharedBuffer.put(
+						currentState.getName(),
+						event.getEventId(),
+						previousEntry,
+						currentVersion);
+
 					final long startTimestamp;
 					if (isStartState(computationState)) {
 						startTimestamp = timestamp;
-						newEntry = sharedBuffer.put(
-							currentState.getName(),
-							event.getEventId(),
-							previousEntry,
-							currentVersion);
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
-						newEntry = sharedBuffer.put(
-							currentState.getName(),
-							event.getEventId(),
-							previousEntry,
-							currentVersion);
 					}
 
 					addComputationState(
@@ -631,7 +655,10 @@ public class NFA<T> {
 		sharedBuffer.lockNode(previousEntry);
 	}
 
-	private State<T> findFinalStateAfterProceed(SharedBuffer<T> sharedBuffer, State<T> state, T event,
+	private State<T> findFinalStateAfterProceed(
+			SharedBuffer<T> sharedBuffer,
+			State<T> state,
+			T event,
 			ComputationState computationState) {
 		final Stack<State<T>> statesToCheck = new Stack<>();
 		statesToCheck.push(state);
@@ -661,7 +688,9 @@ public class NFA<T> {
 		return takeBranches == 0 && ignoreBranches == 0 ? 0 : ignoreBranches + Math.max(1, takeBranches);
 	}
 
-	private OutgoingEdges<T> createDecisionGraph(SharedBuffer<T> sharedBuffer, ComputationState computationState,
+	private OutgoingEdges<T> createDecisionGraph(
+			SharedBuffer<T> sharedBuffer,
+			ComputationState computationState,
 			T event) {
 		State<T> state = getState(computationState);
 		final OutgoingEdges<T> outgoingEdges = new OutgoingEdges<>(state);
@@ -699,8 +728,11 @@ public class NFA<T> {
 		return outgoingEdges;
 	}
 
-	private boolean checkFilterCondition(SharedBuffer<T> sharedBuffer, ComputationState computationState,
-			IterativeCondition<T> condition, T event) throws Exception {
+	private boolean checkFilterCondition(
+			SharedBuffer<T> sharedBuffer,
+			ComputationState computationState,
+			IterativeCondition<T> condition,
+			T event) throws Exception {
 		return condition == null || condition.filter(event, new ConditionContext<>(this, sharedBuffer, computationState));
 	}
 
@@ -712,8 +744,10 @@ public class NFA<T> {
 	 * @param sharedBuffer The {@link SharedBuffer} from which to extract the matches
 	 * @param computationState The end computation state of the extracted event sequences
 	 * @return Collection of event sequences which end in the given computation state
+	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	private Map<String, List<T>> extractCurrentMatches(final SharedBuffer<T> sharedBuffer,
+	private Map<String, List<T>> extractCurrentMatches(
+			final SharedBuffer<T> sharedBuffer,
 			final ComputationState computationState) throws Exception {
 		if (computationState.getPreviousBufferEntry() == null) {
 			return new HashMap<>();
@@ -819,8 +853,7 @@ public class NFA<T> {
 
 		MigratedNFA(
 				final Queue<ComputationState> computationStates,
-				final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer
-		) {
+				final org.apache.flink.cep.nfa.SharedBuffer<T> sharedBuffer) {
 			this.sharedBuffer = sharedBuffer;
 			this.computationStates = computationStates;
 		}
@@ -838,8 +871,8 @@ public class NFA<T> {
 		public NFASerializerConfigSnapshot() {}
 
 		public NFASerializerConfigSnapshot(
-			TypeSerializer<T> eventSerializer,
-			TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {
+				TypeSerializer<T> eventSerializer,
+				TypeSerializer<org.apache.flink.cep.nfa.SharedBuffer<T>> sharedBufferSerializer) {
 
 			super(eventSerializer, sharedBufferSerializer);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 7a43537..a4dbc00 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -251,7 +251,7 @@ public class SharedBuffer<V> {
 			Map<ValueTimeWrapper<V>, EventId> values = new HashMap<>();
 			Map<EventId, Lockable<V>> valuesWithIds = new HashMap<>();
 			Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext = new HashMap<>();
-			Map<Long, Long> totalEventsPerTimestamp = new HashMap<>();
+			Map<Long, Integer> totalEventsPerTimestamp = new HashMap<>();
 			int totalPages = source.readInt();
 
 			for (int i = 0; i < totalPages; i++) {
@@ -263,7 +263,7 @@ public class SharedBuffer<V> {
 					ValueTimeWrapper<V> wrapper = ValueTimeWrapper.deserialize(valueSerializer, source);
 					EventId eventId = values.get(wrapper);
 					if (eventId == null) {
-						long id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0L);
+						int id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0);
 						eventId = new EventId(id, wrapper.timestamp);
 						values.put(wrapper, eventId);
 						valuesWithIds.put(eventId, new Lockable<>(wrapper.value, 1));

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
index 9b99ea1..57d244a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep.nfa.sharedbuffer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.core.memory.DataInputView;
@@ -31,15 +32,15 @@ import java.util.Objects;
  * Composite key for events in {@link SharedBuffer}.
  */
 public class EventId {
-	private final long id;
+	private final int id;
 	private final long timestamp;
 
-	public EventId(long id, long timestamp) {
+	public EventId(int id, long timestamp) {
 		this.id = id;
 		this.timestamp = timestamp;
 	}
 
-	public long getId() {
+	public int getId() {
 		return id;
 	}
 
@@ -110,14 +111,14 @@ public class EventId {
 
 		@Override
 		public void serialize(EventId record, DataOutputView target) throws IOException {
-			LongSerializer.INSTANCE.serialize(record.id, target);
+			IntSerializer.INSTANCE.serialize(record.id, target);
 			LongSerializer.INSTANCE.serialize(record.timestamp, target);
 		}
 
 		@Override
 		public EventId deserialize(DataInputView source) throws IOException {
-			Long id = LongSerializer.INSTANCE.deserialize(source);
-			Long timestamp = LongSerializer.INSTANCE.deserialize(source);
+			int id = IntSerializer.INSTANCE.deserialize(source);
+			long timestamp = LongSerializer.INSTANCE.deserialize(source);
 
 			return new EventId(id, timestamp);
 		}
@@ -129,7 +130,7 @@ public class EventId {
 
 		@Override
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			LongSerializer.INSTANCE.copy(source, target);
+			IntSerializer.INSTANCE.copy(source, target);
 			LongSerializer.INSTANCE.copy(source, target);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
index ca1ecae..b782d8a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -62,10 +62,6 @@ public final class Lockable<T> {
 		return refCounter == 0;
 	}
 
-	int getRefCounter() {
-		return refCounter;
-	}
-
 	public T getElement() {
 		return element;
 	}
@@ -143,7 +139,7 @@ public final class Lockable<T> {
 
 		@Override
 		public Lockable<E> deserialize(DataInputView source) throws IOException {
-			Integer refCount = IntSerializer.INSTANCE.deserialize(source);
+			int refCount = IntSerializer.INSTANCE.deserialize(source);
 			E record = elementSerializer.deserialize(source);
 			return new Lockable<>(record, refCount);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
index 50d997c..32be5da 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -18,10 +18,12 @@
 
 package org.apache.flink.cep.nfa.sharedbuffer;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.state.KeyedStateStore;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.DeweyNumber;
@@ -33,6 +35,7 @@ import org.apache.commons.lang3.StringUtils;
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -69,8 +72,8 @@ public class SharedBuffer<V> {
 	private MapState<EventId, Lockable<V>> eventsBuffer;
 
 	/** The number of events seen so far in the stream per timestamp. */
-	private MapState<Long, Long> eventsCount;
-	private MapState<NodeId, Lockable<SharedBufferNode>> pages;
+	private MapState<Long, Integer> eventsCount;
+	private MapState<NodeId, Lockable<SharedBufferNode>> entries;
 
 	public SharedBuffer(KeyedStateStore stateStore, TypeSerializer<V> valueSerializer) {
 		this.eventsBuffer = stateStore.getMapState(
@@ -79,7 +82,7 @@ public class SharedBuffer<V> {
 				EventId.EventIdSerializer.INSTANCE,
 				new Lockable.LockableTypeSerializer<>(valueSerializer)));
 
-		this.pages = stateStore.getMapState(
+		this.entries = stateStore.getMapState(
 			new MapStateDescriptor<>(
 				entriesStateName,
 				NodeId.NodeIdSerializer.INSTANCE,
@@ -89,7 +92,24 @@ public class SharedBuffer<V> {
 			new MapStateDescriptor<>(
 				eventsCountStateName,
 				LongSerializer.INSTANCE,
-				LongSerializer.INSTANCE));
+				IntSerializer.INSTANCE));
+	}
+
+	/**
+	 * Notifies shared buffer that there will be no events with timestamp &lt;&eq; the given value. I allows to clear
+	 * internal counters for number of events seen so far per timestamp.
+	 *
+	 * @param timestamp watermark, no earlier events will arrive
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public void advanceTime(long timestamp) throws Exception {
+		Iterator<Long> iterator = eventsCount.keys().iterator();
+		while (iterator.hasNext()) {
+			Long next = iterator.next();
+			if (next < timestamp) {
+				iterator.remove();
+			}
+		}
 	}
 
 	/**
@@ -104,14 +124,14 @@ public class SharedBuffer<V> {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	public EventId registerEvent(V value, long timestamp) throws Exception {
-		Long id = eventsCount.get(timestamp);
+		Integer id = eventsCount.get(timestamp);
 		if (id == null) {
-			id = 0L;
+			id = 0;
 		}
 
 		EventId eventId = new EventId(id, timestamp);
 		eventsBuffer.put(eventId, new Lockable<>(value, 1));
-		eventsCount.put(timestamp, id + 1L);
+		eventsCount.put(timestamp, id + 1);
 		return eventId;
 	}
 
@@ -129,9 +149,9 @@ public class SharedBuffer<V> {
 			Map<EventId, Lockable<V>> events,
 			Map<NodeId, Lockable<SharedBufferNode>> entries) throws Exception {
 		eventsBuffer.putAll(events);
-		pages.putAll(entries);
+		this.entries.putAll(entries);
 
-		Map<Long, Long> maxIds = events.keySet().stream().collect(Collectors.toMap(
+		Map<Long, Integer> maxIds = events.keySet().stream().collect(Collectors.toMap(
 			EventId::getTimestamp,
 			EventId::getId,
 			Math::max
@@ -140,31 +160,13 @@ public class SharedBuffer<V> {
 	}
 
 	/**
-	 * Stores given value (value + timestamp) under the given state. It assigns no preceding element
-	 * relation to the entry.
-	 *
-	 * @param stateName name of the state that the event should be assigned to
-	 * @param eventId   unique id of event assigned by this SharedBuffer
-	 * @param version   Version of the previous relation
-	 * @return assigned id of this entry
-	 * @throws Exception Thrown if the system cannot access the state.
-	 */
-	public NodeId put(
-			final String stateName,
-			final EventId eventId,
-			final DeweyNumber version) throws Exception {
-
-		return put(stateName, eventId, null, version);
-	}
-
-	/**
 	 * Stores given value (value + timestamp) under the given state. It assigns a preceding element
 	 * relation to the previous entry.
 	 *
-	 * @param stateName     name of the state that the event should be assigned to
-	 * @param eventId       unique id of event assigned by this SharedBuffer
-	 * @param previousNodeId id of previous entry
-	 * @param version       Version of the previous relation
+	 * @param stateName      name of the state that the event should be assigned to
+	 * @param eventId        unique id of event assigned by this SharedBuffer
+	 * @param previousNodeId id of previous entry (might be null if start of new run)
+	 * @param version        Version of the previous relation
 	 * @return assigned id of this element
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
@@ -179,7 +181,7 @@ public class SharedBuffer<V> {
 		}
 
 		NodeId currentNodeId = new NodeId(eventId, getOriginalNameFromInternal(stateName));
-		Lockable<SharedBufferNode> currentNode = pages.get(currentNodeId);
+		Lockable<SharedBufferNode> currentNode = entries.get(currentNodeId);
 		if (currentNode == null) {
 			currentNode = new Lockable<>(new SharedBufferNode(), 0);
 			lockEvent(eventId);
@@ -188,7 +190,7 @@ public class SharedBuffer<V> {
 		currentNode.getElement().addEdge(new SharedBufferEdge(
 			previousNodeId,
 			version));
-		pages.put(currentNodeId, currentNode);
+		entries.put(currentNodeId, currentNode);
 
 		return currentNodeId;
 	}
@@ -221,7 +223,7 @@ public class SharedBuffer<V> {
 		Stack<ExtractionState> extractionStates = new Stack<>();
 
 		// get the starting shared buffer entry for the previous relation
-		Lockable<SharedBufferNode> entryLock = pages.get(nodeId);
+		Lockable<SharedBufferNode> entryLock = entries.get(nodeId);
 
 		if (entryLock != null) {
 			SharedBufferNode entry = entryLock.getElement();
@@ -271,7 +273,7 @@ public class SharedBuffer<V> {
 							}
 
 							extractionStates.push(new ExtractionState(
-								target != null ? Tuple2.of(target, pages.get(target).getElement()) : null,
+								target != null ? Tuple2.of(target, entries.get(target).getElement()) : null,
 								edge.getDeweyNumber(),
 								newPath));
 						}
@@ -291,10 +293,10 @@ public class SharedBuffer<V> {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	public void lockNode(final NodeId node) throws Exception {
-		Lockable<SharedBufferNode> sharedBufferNode = pages.get(node);
+		Lockable<SharedBufferNode> sharedBufferNode = entries.get(node);
 		if (sharedBufferNode != null) {
 			sharedBufferNode.lock();
-			pages.put(node, sharedBufferNode);
+			entries.put(node, sharedBufferNode);
 		}
 	}
 
@@ -306,18 +308,18 @@ public class SharedBuffer<V> {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	public void releaseNode(final NodeId node) throws Exception {
-		Lockable<SharedBufferNode> sharedBufferNode = pages.get(node);
+		Lockable<SharedBufferNode> sharedBufferNode = entries.get(node);
 		if (sharedBufferNode != null) {
 			if (sharedBufferNode.release()) {
 				removeNode(node, sharedBufferNode.getElement());
 			} else {
-				pages.put(node, sharedBufferNode);
+				entries.put(node, sharedBufferNode);
 			}
 		}
 	}
 
 	private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception {
-		pages.remove(node);
+		entries.remove(node);
 		EventId eventId = node.getEventId();
 		releaseEvent(eventId);
 
@@ -392,4 +394,9 @@ public class SharedBuffer<V> {
 		}
 	}
 
+	@VisibleForTesting
+	Iterator<Map.Entry<Long, Integer>> getEventCounters() throws Exception {
+		return eventsCount.iterator();
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 73a9200..1e482c3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -192,7 +192,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 			if (comparator == null) {
 				// there can be no out of order elements in processing time
 				NFAState nfaState = getNFAState();
-				processEvent(nfaState, element.getValue(), getProcessingTimeService().getCurrentProcessingTime());
+				long timestamp = getProcessingTimeService().getCurrentProcessingTime();
+				advanceTime(nfaState, timestamp);
+				processEvent(nfaState, element.getValue(), timestamp);
 				updateNFA(nfaState);
 			} else {
 				long currentTime = timerService.currentProcessingTime();
@@ -272,15 +274,16 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 		// STEP 2
 		while (!sortedTimestamps.isEmpty() && sortedTimestamps.peek() <= timerService.currentWatermark()) {
 			long timestamp = sortedTimestamps.poll();
+			advanceTime(nfaState, timestamp);
 			try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
 				elements.forEachOrdered(
-						event -> {
-							try {
-								processEvent(nfaState, event, timestamp);
-							} catch (Exception e) {
-								throw new RuntimeException(e);
-							}
+					event -> {
+						try {
+							processEvent(nfaState, event, timestamp);
+						} catch (Exception e) {
+							throw new RuntimeException(e);
 						}
+					}
 				);
 			}
 			elementQueueState.remove(timestamp);
@@ -318,15 +321,16 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 		// STEP 2
 		while (!sortedTimestamps.isEmpty()) {
 			long timestamp = sortedTimestamps.poll();
+			advanceTime(nfa, timestamp);
 			try (Stream<IN> elements = sort(elementQueueState.get(timestamp))) {
 				elements.forEachOrdered(
-						event -> {
-							try {
-								processEvent(nfa, event, timestamp);
-							} catch (Exception e) {
-								throw new RuntimeException(e);
-							}
+					event -> {
+						try {
+							processEvent(nfa, event, timestamp);
+						} catch (Exception e) {
+							throw new RuntimeException(e);
 						}
+					}
 				);
 			}
 			elementQueueState.remove(timestamp);
@@ -377,18 +381,19 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 	 * @param timestamp The timestamp of the event
 	 */
 	private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
-		Tuple2<Collection<Map<String, List<IN>>>, Collection<Tuple2<Map<String, List<IN>>, Long>>> patterns =
+		Collection<Map<String, List<IN>>> patterns =
 				nfa.process(partialMatches, nfaState, event, timestamp, afterMatchSkipStrategy);
-		processMatchedSequences(patterns.f0, timestamp);
-		processTimedOutSequences(patterns.f1, timestamp);
+		processMatchedSequences(patterns, timestamp);
 	}
 
 	/**
-	 * Advances the time for the given NFA to the given timestamp. This can lead to pruning and
-	 * timeouts.
+	 * Advances the time for the given NFA to the given timestamp. This means that no more events with timestamp
+	 * <b>lower</b> than the given timestamp should be passed to the nfa, This can lead to pruning and timeouts.
 	 */
 	private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
-		processEvent(nfaState, null, timestamp);
+		Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
+			nfa.advanceTime(partialMatches, nfaState, timestamp);
+		processTimedOutSequences(timedOut, timestamp);
 	}
 
 	protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 7e904c8..ae68d02 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -35,6 +35,7 @@ import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -49,6 +50,7 @@ import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
 import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
 import static org.apache.flink.cep.utils.NFAUtils.compile;
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
 
 /**
  * General tests for {@link NFA} features. See also {@link IterativeConditionsITCase}, {@link NotPatternITCase},
@@ -399,11 +401,11 @@ public class NFAITCase extends TestLogger {
 		NFAState nfaState = nfa.createInitialNFAState();
 
 		for (StreamRecord<Event> event: events) {
-			Tuple2<Collection<Map<String, List<Event>>>, Collection<Tuple2<Map<String, List<Event>>, Long>>> patterns =
-					nfa.process(sharedBuffer, nfaState, event.getValue(), event.getTimestamp());
 
-			Collection<Map<String, List<Event>>> matchedPatterns = patterns.f0;
-			Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns = patterns.f1;
+			Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
+				nfa.advanceTime(sharedBuffer, nfaState, event.getTimestamp());
+			Collection<Map<String, List<Event>>> matchedPatterns =
+				nfa.process(sharedBuffer, nfaState, event.getValue(), event.getTimestamp());
 
 			resultingPatterns.addAll(matchedPatterns);
 			resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2338,7 +2340,7 @@ public class NFAITCase extends TestLogger {
 		nfa.process(sharedBuffer, nfaState, end1, 6);
 
 		//pruning element
-		nfa.process(sharedBuffer, nfaState, null, 10);
+		nfa.advanceTime(sharedBuffer, nfaState, 10);
 
 		assertEquals(1, nfaState.getComputationStates().size());
 		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
@@ -2382,7 +2384,7 @@ public class NFAITCase extends TestLogger {
 		nfa.process(sharedBuffer, nfaState, end1, 6);
 
 		//pruning element
-		nfa.process(sharedBuffer, nfaState, null, 10);
+		nfa.advanceTime(sharedBuffer, nfaState, 10);
 
 		assertEquals(1, nfaState.getComputationStates().size());
 		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
@@ -2428,7 +2430,7 @@ public class NFAITCase extends TestLogger {
 		nfa.process(sharedBuffer, nfaState, end1, 6);
 
 		//pruning element
-		nfa.process(sharedBuffer, nfaState, null, 10);
+		nfa.advanceTime(sharedBuffer, nfaState, 10);
 
 		assertEquals(1, nfaState.getComputationStates().size());
 		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
@@ -2474,7 +2476,7 @@ public class NFAITCase extends TestLogger {
 		nfa.process(sharedBuffer, nfaState, end1, 6);
 
 		//pruning element
-		nfa.process(sharedBuffer, nfaState, null, 10);
+		nfa.advanceTime(sharedBuffer, nfaState, 10);
 
 		assertEquals(1, nfaState.getComputationStates().size());
 		assertEquals("start", nfaState.getComputationStates().peek().getCurrentStateName());
@@ -2734,7 +2736,7 @@ public class NFAITCase extends TestLogger {
 				sharedBuffer,
 				nfaState,
 				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
+				inputEvent.getTimestamp());
 
 			resultingPatterns.addAll(patterns);
 		}
@@ -2809,7 +2811,7 @@ public class NFAITCase extends TestLogger {
 				sharedBuffer,
 				nfaState,
 				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
+				inputEvent.getTimestamp());
 
 			resultingPatterns.addAll(patterns);
 		}
@@ -2826,4 +2828,22 @@ public class NFAITCase extends TestLogger {
 
 		Assert.assertEquals(expectedOrder, resultOrder);
 	}
+
+	@Test
+	public void testSharedBufferClearing() throws Exception {
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").followedBy("end");
+
+		Event a = new Event(40, "a", 1.0);
+		Event b = new Event(41, "b", 2.0);
+
+		SharedBuffer<Event> spiedBuffer = Mockito.spy(sharedBuffer);
+		NFA<Event> nfa = compile(pattern, false);
+
+		nfa.process(spiedBuffer, nfa.createInitialNFAState(), a, 1);
+		nfa.process(spiedBuffer, nfa.createInitialNFAState(), b, 2);
+		Mockito.verify(spiedBuffer, Mockito.never()).advanceTime(anyLong());
+		nfa.advanceTime(spiedBuffer, nfa.createInitialNFAState(), 2);
+		Mockito.verify(spiedBuffer, Mockito.times(1)).advanceTime(2);
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index ab9c9b1..ea20bee 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -117,13 +117,13 @@ public class NFAStatusChangeITCase {
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
 		// as the timestamp is within the window
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, null, 8L);
+		nfa.advanceTime(sharedBuffer, nfaState, 8L);
 		assertFalse("NFA status should not change as the timestamp is within the window", nfaState.isStateChanged());
 
 		// timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will
 		// be removed from eventSharedBuffer as the timeout happens
 		nfaState.resetStateChanged();
-		Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.process(sharedBuffer, nfaState, null, 12L).f1;
+		Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.advanceTime(sharedBuffer, nfaState, 12L);
 		assertTrue("NFA status should change as timeout happens", nfaState.isStateChanged() && !timeoutResults.isEmpty());
 	}
 
@@ -183,10 +183,11 @@ public class NFAStatusChangeITCase {
 		NFAState nfaState = nfa.createInitialNFAState();
 
 		nfaState.resetStateChanged();
+		nfa.advanceTime(sharedBuffer, nfaState, 6L);
 		nfa.process(sharedBuffer, nfaState, new Event(6, "start", 1.0), 6L);
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, new Event(6, "end", 1.0), 17L);
+		nfa.advanceTime(sharedBuffer, nfaState, 17L);
 		assertTrue(nfaState.isStateChanged());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 646455f..e626b50 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -185,11 +185,12 @@ public class NFATest extends TestLogger {
 
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
 		for (StreamRecord<Event> streamEvent : inputs) {
+			nfa.advanceTime(sharedBuffer, nfaState, streamEvent.getTimestamp());
 			Collection<Map<String, List<Event>>> matchedPatterns = nfa.process(
 				sharedBuffer,
 				nfaState,
 				streamEvent.getValue(),
-				streamEvent.getTimestamp()).f0;
+				streamEvent.getTimestamp());
 
 			actualPatterns.addAll(matchedPatterns);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
index 00a3bfd..58ba224 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -66,12 +66,13 @@ public class NFATestUtilities {
 
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
 		for (StreamRecord<Event> inputEvent : inputEvents) {
+			nfa.advanceTime(sharedBuffer, nfaState, inputEvent.getTimestamp());
 			Collection<Map<String, List<Event>>> patterns = nfa.process(
 				sharedBuffer,
 				nfaState,
 				inputEvent.getValue(),
 				inputEvent.getTimestamp(),
-				afterMatchSkipStrategy).f0;
+				afterMatchSkipStrategy);
 
 			for (Map<String, List<Event>> p: patterns) {
 				List<Event> res = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
index 9be7cc1..342c9ef 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -29,6 +29,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -93,9 +94,9 @@ public class SharedBufferTest extends TestLogger {
 		expectedPattern3.put("b", new ArrayList<>());
 		expectedPattern3.get("b").add(events[7]);
 
-		NodeId a10 = sharedBuffer.put("a1", eventIds[0], DeweyNumber.fromString("1"));
+		NodeId a10 = sharedBuffer.put("a1", eventIds[0], null, DeweyNumber.fromString("1"));
 		NodeId aLoop0 = sharedBuffer.put("a[]", eventIds[1], a10, DeweyNumber.fromString("1.0"));
-		NodeId a11 = sharedBuffer.put("a1", eventIds[2], DeweyNumber.fromString("2"));
+		NodeId a11 = sharedBuffer.put("a1", eventIds[2], null, DeweyNumber.fromString("2"));
 		NodeId aLoop1 = sharedBuffer.put("a[]", eventIds[2], aLoop0, DeweyNumber.fromString("1.0"));
 		NodeId aLoop2 = sharedBuffer.put("a[]", eventIds[3], aLoop1, DeweyNumber.fromString("1.0"));
 		NodeId aSecondLoop0 = sharedBuffer.put("a[]", eventIds[3], a11, DeweyNumber.fromString("2.0"));
@@ -106,12 +107,16 @@ public class SharedBufferTest extends TestLogger {
 		NodeId aLoop5 = sharedBuffer.put("a[]", eventIds[6], aLoop4, DeweyNumber.fromString("1.1"));
 		NodeId b3 = sharedBuffer.put("b", eventIds[7], aLoop5, DeweyNumber.fromString("1.1.0"));
 
-		Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns(b3, DeweyNumber.fromString("1.1.0"));
+		Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns(b3,
+			DeweyNumber.fromString("1.1.0"));
 		sharedBuffer.releaseNode(b3);
-		Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns(b3, DeweyNumber.fromString("1.1.0"));
+		Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns(b3,
+			DeweyNumber.fromString("1.1.0"));
 
-		Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns(b1, DeweyNumber.fromString("2.0.0"));
-		Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns(b0, DeweyNumber.fromString("1.0.0"));
+		Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns(b1,
+			DeweyNumber.fromString("2.0.0"));
+		Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns(b0,
+			DeweyNumber.fromString("1.0.0"));
 		sharedBuffer.releaseNode(b0);
 		sharedBuffer.releaseNode(b1);
 
@@ -144,7 +149,7 @@ public class SharedBufferTest extends TestLogger {
 			eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
 		}
 
-		NodeId start = sharedBuffer.put("start", eventIds[1], DeweyNumber.fromString("1"));
+		NodeId start = sharedBuffer.put("start", eventIds[1], null, DeweyNumber.fromString("1"));
 		NodeId b0 = sharedBuffer.put("branching", eventIds[2], start, DeweyNumber.fromString("1.0"));
 		NodeId b1 = sharedBuffer.put("branching", eventIds[3], start, DeweyNumber.fromString("1.1"));
 		NodeId b00 = sharedBuffer.put("branching", eventIds[3], b0, DeweyNumber.fromString("1.0.0"));
@@ -189,7 +194,7 @@ public class SharedBufferTest extends TestLogger {
 		expectedResult.put("c", new ArrayList<>());
 		expectedResult.get("c").add(events[4]);
 
-		NodeId a = sharedBuffer.put("a", eventIds[0], DeweyNumber.fromString("1"));
+		NodeId a = sharedBuffer.put("a", eventIds[0], null, DeweyNumber.fromString("1"));
 		NodeId b = sharedBuffer.put("b", eventIds[1], a, DeweyNumber.fromString("1.0"));
 		NodeId aa = sharedBuffer.put("aa", eventIds[2], b, DeweyNumber.fromString("1.0.0"));
 		NodeId bb = sharedBuffer.put("bb", eventIds[3], aa, DeweyNumber.fromString("1.0.0.0"));
@@ -213,4 +218,24 @@ public class SharedBufferTest extends TestLogger {
 		assertEquals(expectedOrder, resultOrder);
 	}
 
+	@Test
+	public void testSharedBufferCountersClearing() throws Exception {
+		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		int numberEvents = 4;
+		Event[] events = new Event[numberEvents];
+
+		for (int i = 0; i < numberEvents; i++) {
+			events[i] = new Event(i + 1, "e" + (i + 1), i);
+			sharedBuffer.registerEvent(events[i], i);
+		}
+
+		sharedBuffer.advanceTime(3);
+
+		Iterator<Map.Entry<Long, Integer>> counters = sharedBuffer.getEventCounters();
+		Map.Entry<Long, Integer> entry = counters.next();
+		assertEquals(3, entry.getKey().longValue());
+		assertEquals(1, entry.getValue().intValue());
+		assertFalse(counters.hasNext());
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
index 2c7b979..4d510cf 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/TestSharedBuffer.java
@@ -245,6 +245,12 @@ public class TestSharedBuffer<V> extends SharedBuffer<V> {
 				stateReads++;
 				return iterator.next();
 			}
+
+			@Override
+			public void remove() {
+				stateWrites++;
+				iterator.remove();
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
index 5adce4d..b125de9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyedStateFunction.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.state.State;
@@ -22,8 +23,8 @@ import org.apache.flink.api.common.state.State;
 /**
  * A function to be applied to all keyed states.
  *
- * <p>This functionality is only available through the
- * {@code BroadcastConnectedStream.process(final KeyedBroadcastProcessFunction function)}.
+ * @param <K> The type of key.
+ * @param <S> The type of state.
  */
 public abstract class KeyedStateFunction<K, S extends State> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/05ee3ce9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index c193416..9915dd5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -396,8 +396,6 @@ public abstract class AbstractStreamOperator<OUT>
 		return snapshotInProgress;
 	}
 
-
-
 	/**
 	 * Stream operators with state, which want to participate in a snapshot need to override this hook method.
 	 *