You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/08/29 15:08:59 UTC

[GitHub] dawidwys closed pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process

dawidwys closed pull request #6205: [FLINK-9642]Reduce the count to deal with state during a CEP process
URL: https://github.com/apache/flink/pull/6205
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 815b25ad153..b355c38e858 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -33,6 +33,7 @@
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.core.memory.DataInputView;
@@ -175,7 +176,7 @@ private boolean isFinalState(ComputationState state) {
 	 * <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
 	 * with the element that resulted in the stop state.
 	 *
-	 * @param sharedBuffer the SharedBuffer object that we need to work upon while processing
+	 * @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
 	 * @param nfaState The NFAState object that we need to affect while processing
 	 * @param event The current event to be processed or null if only pruning shall be done
 	 * @param timestamp The timestamp of the current event
@@ -185,11 +186,11 @@ private boolean isFinalState(ComputationState state) {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	public Collection<Map<String, List<T>>> process(
-			final SharedBuffer<T> sharedBuffer,
+			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final NFAState nfaState,
 			final T event,
 			final long timestamp) throws Exception {
-		return process(sharedBuffer, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip());
+		return process(sharedBufferAccessor, nfaState, event, timestamp, AfterMatchSkipStrategy.noSkip());
 	}
 
 	/**
@@ -200,7 +201,7 @@ private boolean isFinalState(ComputationState state) {
 	 * <p>If computations reach a stop state, the path forward is discarded and currently constructed path is returned
 	 * with the element that resulted in the stop state.
 	 *
-	 * @param sharedBuffer the SharedBuffer object that we need to work upon while processing
+	 * @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
 	 * @param nfaState The NFAState object that we need to affect while processing
 	 * @param event The current event to be processed or null if only pruning shall be done
 	 * @param timestamp The timestamp of the current event
@@ -211,13 +212,13 @@ private boolean isFinalState(ComputationState state) {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	public Collection<Map<String, List<T>>> process(
-			final SharedBuffer<T> sharedBuffer,
+			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final NFAState nfaState,
 			final T event,
 			final long timestamp,
 			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
-		try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBuffer)) {
-			return doProcess(sharedBuffer, nfaState, eventWrapper, afterMatchSkipStrategy);
+		try (EventWrapper eventWrapper = new EventWrapper(event, timestamp, sharedBufferAccessor)) {
+			return doProcess(sharedBufferAccessor, nfaState, eventWrapper, afterMatchSkipStrategy);
 		}
 	}
 
@@ -225,33 +226,32 @@ private boolean isFinalState(ComputationState state) {
 	 * Prunes states assuming there will be no events with timestamp <b>lower</b> than the given one.
 	 * It cleares the sharedBuffer and also emits all timed out partial matches.
 	 *
-	 * @param sharedBuffer the SharedBuffer object that we need to work upon while processing
+	 * @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
 	 * @param nfaState     The NFAState object that we need to affect while processing
 	 * @param timestamp    timestamp that indicates that there will be no more events with lower timestamp
 	 * @return all timed outed partial matches
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	public Collection<Tuple2<Map<String, List<T>>, Long>> advanceTime(
-			final SharedBuffer<T> sharedBuffer,
+			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final NFAState nfaState,
 			final long timestamp) throws Exception {
 
 		final Collection<Tuple2<Map<String, List<T>>, Long>> timeoutResult = new ArrayList<>();
 		final PriorityQueue<ComputationState> newPartialMatches = new PriorityQueue<>(NFAState.COMPUTATION_STATE_COMPARATOR);
 
-		Map<EventId, T> eventsCache = new HashMap<>();
 		for (ComputationState computationState : nfaState.getPartialMatches()) {
 			if (isStateTimedOut(computationState, timestamp)) {
 
 				if (handleTimeout) {
 					// extract the timed out event pattern
-					Map<String, List<T>> timedOutPattern = sharedBuffer.materializeMatch(extractCurrentMatches(
-						sharedBuffer,
-						computationState), eventsCache);
+					Map<String, List<T>> timedOutPattern = sharedBufferAccessor.materializeMatch(extractCurrentMatches(
+						sharedBufferAccessor,
+						computationState));
 					timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
 				}
 
-				sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
+				sharedBufferAccessor.releaseNode(computationState.getPreviousBufferEntry());
 
 				nfaState.setStateChanged();
 			} else {
@@ -261,9 +261,10 @@ private boolean isFinalState(ComputationState state) {
 
 		nfaState.setNewPartialMatches(newPartialMatches);
 
-		sharedBuffer.advanceTime(timestamp);
+		sharedBufferAccessor.advanceTime(timestamp);
 
 		return timeoutResult;
+
 	}
 
 	private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
@@ -271,7 +272,7 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
 	}
 
 	private Collection<Map<String, List<T>>> doProcess(
-			final SharedBuffer<T> sharedBuffer,
+			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final NFAState nfaState,
 			final EventWrapper event,
 			final AfterMatchSkipStrategy afterMatchSkipStrategy) throws Exception {
@@ -282,7 +283,7 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
 		// iterate over all current computations
 		for (ComputationState computationState : nfaState.getPartialMatches()) {
 			final Collection<ComputationState> newComputationStates = computeNextStates(
-				sharedBuffer,
+				sharedBufferAccessor,
 				computationState,
 				event,
 				event.getTimestamp());
@@ -304,7 +305,7 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
 				} else if (isStopState(newComputationState)) {
 					//reached stop state. release entry for the stop state
 					shouldDiscardPath = true;
-					sharedBuffer.releaseNode(newComputationState.getPreviousBufferEntry());
+					sharedBufferAccessor.releaseNode(newComputationState.getPreviousBufferEntry());
 				} else {
 					// add new computation state; it will be processed once the next event arrives
 					statesToRetain.add(newComputationState);
@@ -315,7 +316,7 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
 				// a stop state was reached in this branch. release branch which results in removing previous event from
 				// the buffer
 				for (final ComputationState state : statesToRetain) {
-					sharedBuffer.releaseNode(state.getPreviousBufferEntry());
+					sharedBufferAccessor.releaseNode(state.getPreviousBufferEntry());
 				}
 			} else {
 				newPartialMatches.addAll(statesToRetain);
@@ -328,7 +329,7 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
 
 		List<Map<String, List<T>>> result = new ArrayList<>();
 		if (afterMatchSkipStrategy.isSkipStrategy()) {
-			processMatchesAccordingToSkipStrategy(sharedBuffer,
+			processMatchesAccordingToSkipStrategy(sharedBufferAccessor,
 				nfaState,
 				afterMatchSkipStrategy,
 				potentialMatches,
@@ -336,17 +337,15 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
 				result);
 		} else {
 			for (ComputationState match : potentialMatches) {
-				Map<EventId, T> eventsCache = new HashMap<>();
 				Map<String, List<T>> materializedMatch =
-					sharedBuffer.materializeMatch(
-						sharedBuffer.extractPatterns(
+					sharedBufferAccessor.materializeMatch(
+						sharedBufferAccessor.extractPatterns(
 							match.getPreviousBufferEntry(),
-							match.getVersion()).get(0),
-						eventsCache
+							match.getVersion()).get(0)
 					);
 
 				result.add(materializedMatch);
-				sharedBuffer.releaseNode(match.getPreviousBufferEntry());
+				sharedBufferAccessor.releaseNode(match.getPreviousBufferEntry());
 			}
 		}
 
@@ -356,7 +355,7 @@ private boolean isStateTimedOut(final ComputationState state, final long timesta
 	}
 
 	private void processMatchesAccordingToSkipStrategy(
-			SharedBuffer<T> sharedBuffer,
+			SharedBufferAccessor<T> sharedBufferAccessor,
 			NFAState nfaState,
 			AfterMatchSkipStrategy afterMatchSkipStrategy,
 			PriorityQueue<ComputationState> potentialMatches,
@@ -369,7 +368,6 @@ private void processMatchesAccordingToSkipStrategy(
 
 		if (earliestMatch != null) {
 
-			Map<EventId, T> eventsCache = new HashMap<>();
 			ComputationState earliestPartialMatch;
 			while (earliestMatch != null && ((earliestPartialMatch = partialMatches.peek()) == null ||
 				isEarlier(earliestMatch, earliestPartialMatch))) {
@@ -377,19 +375,19 @@ private void processMatchesAccordingToSkipStrategy(
 				nfaState.setStateChanged();
 				nfaState.getCompletedMatches().poll();
 				List<Map<String, List<EventId>>> matchedResult =
-					sharedBuffer.extractPatterns(earliestMatch.getPreviousBufferEntry(), earliestMatch.getVersion());
+					sharedBufferAccessor.extractPatterns(earliestMatch.getPreviousBufferEntry(), earliestMatch.getVersion());
 
 				afterMatchSkipStrategy.prune(
 					partialMatches,
 					matchedResult,
-					sharedBuffer);
+					sharedBufferAccessor);
 
 				afterMatchSkipStrategy.prune(
 					nfaState.getCompletedMatches(),
 					matchedResult,
-					sharedBuffer);
+					sharedBufferAccessor);
 
-				result.add(sharedBuffer.materializeMatch(matchedResult.get(0), eventsCache));
+				result.add(sharedBufferAccessor.materializeMatch(matchedResult.get(0)));
 				earliestMatch = nfaState.getCompletedMatches().peek();
 			}
 
@@ -462,19 +460,19 @@ private boolean isSelfIgnore(final StateTransition<T> edge) {
 
 		private long timestamp;
 
-		private final SharedBuffer<T> sharedBuffer;
+		private final SharedBufferAccessor<T> sharedBufferAccessor;
 
 		private EventId eventId;
 
-		EventWrapper(T event, long timestamp, SharedBuffer<T> sharedBuffer) {
+		EventWrapper(T event, long timestamp, SharedBufferAccessor<T> sharedBufferAccessor) {
 			this.event = event;
 			this.timestamp = timestamp;
-			this.sharedBuffer = sharedBuffer;
+			this.sharedBufferAccessor = sharedBufferAccessor;
 		}
 
 		EventId getEventId() throws Exception {
 			if (eventId == null) {
-				this.eventId = sharedBuffer.registerEvent(event, timestamp);
+				this.eventId = sharedBufferAccessor.registerEvent(event, timestamp);
 			}
 
 			return eventId;
@@ -491,7 +489,7 @@ public long getTimestamp() {
 		@Override
 		public void close() throws Exception {
 			if (eventId != null) {
-				sharedBuffer.releaseEvent(eventId);
+				sharedBufferAccessor.releaseEvent(eventId);
 			}
 		}
 	}
@@ -523,7 +521,7 @@ public void close() throws Exception {
 	 *     <li>Release the corresponding entries in {@link SharedBuffer}.</li>
 	 *</ol>
 	 *
-	 * @param sharedBuffer The shared buffer that we need to change
+	 * @param sharedBufferAccessor The accessor to shared buffer that we need to change
 	 * @param computationState Current computation state
 	 * @param event Current event which is processed
 	 * @param timestamp Timestamp of the current event
@@ -531,12 +529,12 @@ public void close() throws Exception {
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	private Collection<ComputationState> computeNextStates(
-			final SharedBuffer<T> sharedBuffer,
+			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final ComputationState computationState,
 			final EventWrapper event,
 			final long timestamp) throws Exception {
 
-		final ConditionContext<T> context = new ConditionContext<>(this, sharedBuffer, computationState);
+		final ConditionContext<T> context = new ConditionContext<>(this, sharedBufferAccessor, computationState);
 
 		final OutgoingEdges<T> outgoingEdges = createDecisionGraph(context, computationState, event.getEvent());
 
@@ -569,7 +567,7 @@ public void close() throws Exception {
 						}
 
 						addComputationState(
-							sharedBuffer,
+							sharedBufferAccessor,
 							resultingComputationStates,
 							edge.getTargetState(),
 							computationState.getPreviousBufferEntry(),
@@ -590,7 +588,7 @@ public void close() throws Exception {
 					final DeweyNumber nextVersion = new DeweyNumber(currentVersion).addStage();
 					takeBranchesToVisit--;
 
-					final NodeId newEntry = sharedBuffer.put(
+					final NodeId newEntry = sharedBufferAccessor.put(
 						currentState.getName(),
 						event.getEventId(),
 						previousEntry,
@@ -607,7 +605,7 @@ public void close() throws Exception {
 					}
 
 					addComputationState(
-							sharedBuffer,
+							sharedBufferAccessor,
 							resultingComputationStates,
 							nextState,
 							newEntry,
@@ -619,7 +617,7 @@ public void close() throws Exception {
 					final State<T> finalState = findFinalStateAfterProceed(context, nextState, event.getEvent());
 					if (finalState != null) {
 						addComputationState(
-								sharedBuffer,
+								sharedBufferAccessor,
 								resultingComputationStates,
 								finalState,
 								newEntry,
@@ -643,14 +641,14 @@ public void close() throws Exception {
 
 		if (computationState.getPreviousBufferEntry() != null) {
 			// release the shared entry referenced by the current computation state.
-			sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
+			sharedBufferAccessor.releaseNode(computationState.getPreviousBufferEntry());
 		}
 
 		return resultingComputationStates;
 	}
 
 	private void addComputationState(
-			SharedBuffer<T> sharedBuffer,
+			SharedBufferAccessor<T> sharedBufferAccessor,
 			List<ComputationState> computationStates,
 			State<T> currentState,
 			NodeId previousEntry,
@@ -661,7 +659,7 @@ private void addComputationState(
 				currentState.getName(), previousEntry, version, startTimestamp, startEventId);
 		computationStates.add(computationState);
 
-		sharedBuffer.lockNode(previousEntry);
+		sharedBufferAccessor.lockNode(previousEntry);
 	}
 
 	private State<T> findFinalStateAfterProceed(
@@ -747,19 +745,19 @@ private boolean checkFilterCondition(
 	 * sequence is returned as a map which contains the events and the names of the states to which
 	 * the events were mapped.
 	 *
-	 * @param sharedBuffer The {@link SharedBuffer} from which to extract the matches
+	 * @param sharedBufferAccessor The accessor to {@link SharedBuffer} from which to extract the matches
 	 * @param computationState The end computation state of the extracted event sequences
 	 * @return Collection of event sequences which end in the given computation state
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	private Map<String, List<EventId>> extractCurrentMatches(
-			final SharedBuffer<T> sharedBuffer,
+			final SharedBufferAccessor<T> sharedBufferAccessor,
 			final ComputationState computationState) throws Exception {
 		if (computationState.getPreviousBufferEntry() == null) {
 			return new HashMap<>();
 		}
 
-		List<Map<String, List<EventId>>> paths = sharedBuffer.extractPatterns(
+		List<Map<String, List<EventId>>> paths = sharedBufferAccessor.extractPatterns(
 				computationState.getPreviousBufferEntry(),
 				computationState.getVersion());
 
@@ -789,15 +787,15 @@ private boolean checkFilterCondition(
 
 		private NFA<T> nfa;
 
-		private SharedBuffer<T> sharedBuffer;
+		private SharedBufferAccessor<T> sharedBufferAccessor;
 
 		ConditionContext(
 				final NFA<T> nfa,
-				final SharedBuffer<T> sharedBuffer,
+				final SharedBufferAccessor<T> sharedBufferAccessor,
 				final ComputationState computationState) {
 			this.computationState = computationState;
 			this.nfa = nfa;
-			this.sharedBuffer = sharedBuffer;
+			this.sharedBufferAccessor = sharedBufferAccessor;
 		}
 
 		@Override
@@ -808,7 +806,7 @@ private boolean checkFilterCondition(
 			// this is to avoid any overheads when using a simple, non-iterative condition.
 
 			if (matchedEvents == null) {
-				this.matchedEvents = sharedBuffer.materializeMatch(nfa.extractCurrentMatches(sharedBuffer,
+				this.matchedEvents = sharedBufferAccessor.materializeMatch(nfa.extractCurrentMatches(sharedBufferAccessor,
 					computationState));
 			}
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
index e0f399f46bf..8151a124af4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -20,7 +20,7 @@
 
 import org.apache.flink.cep.nfa.ComputationState;
 import org.apache.flink.cep.nfa.sharedbuffer.EventId;
-import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -87,13 +87,13 @@ public static AfterMatchSkipStrategy noSkip() {
 	 *
 	 * @param matchesToPrune current partial matches
 	 * @param matchedResult  already completed matches
-	 * @param sharedBuffer   corresponding shared buffer
+	 * @param sharedBufferAccessor   accessor to corresponding shared buffer
 	 * @throws Exception thrown if could not access the state
 	 */
 	public void prune(
 			Collection<ComputationState> matchesToPrune,
 			Collection<Map<String, List<EventId>>> matchedResult,
-			SharedBuffer<?> sharedBuffer) throws Exception {
+			SharedBufferAccessor<?> sharedBufferAccessor) throws Exception {
 
 		EventId pruningId = getPruningId(matchedResult);
 		if (pruningId != null) {
@@ -101,7 +101,7 @@ public void prune(
 			for (ComputationState computationState : matchesToPrune) {
 				if (computationState.getStartEventID() != null &&
 					shouldPrune(computationState.getStartEventID(), pruningId)) {
-					sharedBuffer.releaseNode(computationState.getPreviousBufferEntry());
+					sharedBufferAccessor.releaseNode(computationState.getPreviousBufferEntry());
 					discardStates.add(computationState);
 				}
 			}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
index c35e4d7de0a..5a378972e9e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -25,28 +25,15 @@
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.nfa.DeweyNumber;
 import org.apache.flink.util.WrappingRuntimeException;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
 
-import org.apache.commons.lang3.StringUtils;
-
-import javax.annotation.Nullable;
-
-import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
 import java.util.Map;
-import java.util.Stack;
 import java.util.stream.Collectors;
 
-import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
-import static org.apache.flink.util.Preconditions.checkState;
-
 /**
  * A shared buffer implementation which stores values under according state. Additionally, the values can be
  * versioned such that it is possible to retrieve their predecessor element in the buffer.
@@ -70,13 +57,17 @@
 	private static final String eventsStateName = "sharedBuffer-events";
 	private static final String eventsCountStateName = "sharedBuffer-events-count";
 
-	/** The buffer holding the unique events seen so far. */
 	private MapState<EventId, Lockable<V>> eventsBuffer;
-
 	/** The number of events seen so far in the stream per timestamp. */
 	private MapState<Long, Integer> eventsCount;
 	private MapState<NodeId, Lockable<SharedBufferNode>> entries;
 
+	/** The cache of eventsBuffer State. */
+	private Map<EventId, Lockable<V>> eventsBufferCache = new HashMap<>();
+
+	/** The cache of sharedBufferNode. */
+	private Map<NodeId, Lockable<SharedBufferNode>> entryCache = new HashMap<>();
+
 	public SharedBuffer(KeyedStateStore stateStore, TypeSerializer<V> valueSerializer) {
 		this.eventsBuffer = stateStore.getMapState(
 			new MapStateDescriptor<>(
@@ -97,46 +88,6 @@ public SharedBuffer(KeyedStateStore stateStore, TypeSerializer<V> valueSerialize
 				IntSerializer.INSTANCE));
 	}
 
-	/**
-	 * Notifies shared buffer that there will be no events with timestamp &lt;&eq; the given value. I allows to clear
-	 * internal counters for number of events seen so far per timestamp.
-	 *
-	 * @param timestamp watermark, no earlier events will arrive
-	 * @throws Exception Thrown if the system cannot access the state.
-	 */
-	public void advanceTime(long timestamp) throws Exception {
-		Iterator<Long> iterator = eventsCount.keys().iterator();
-		while (iterator.hasNext()) {
-			Long next = iterator.next();
-			if (next < timestamp) {
-				iterator.remove();
-			}
-		}
-	}
-
-	/**
-	 * Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a
-	 * lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed
-	 * after processing all {@link org.apache.flink.cep.nfa.ComputationState}s
-	 *
-	 * <p><b>NOTE:</b>Should be called only once for each unique event!
-	 *
-	 * @param value event to be registered
-	 * @return unique id of that event that should be used when putting entries to the buffer.
-	 * @throws Exception Thrown if the system cannot access the state.
-	 */
-	public EventId registerEvent(V value, long timestamp) throws Exception {
-		Integer id = eventsCount.get(timestamp);
-		if (id == null) {
-			id = 0;
-		}
-
-		EventId eventId = new EventId(id, timestamp);
-		eventsBuffer.put(eventId, new Lockable<>(value, 1));
-		eventsCount.put(timestamp, id + 1);
-		return eventId;
-	}
-
 	/**
 	 * Initializes underlying state with given map of events and entries. Should be used only in case of migration from
 	 * old state.
@@ -162,39 +113,34 @@ public void init(
 	}
 
 	/**
-	 * Stores given value (value + timestamp) under the given state. It assigns a preceding element
-	 * relation to the previous entry.
+	 * Construct an accessor to deal with this sharedBuffer.
 	 *
-	 * @param stateName      name of the state that the event should be assigned to
-	 * @param eventId        unique id of event assigned by this SharedBuffer
-	 * @param previousNodeId id of previous entry (might be null if start of new run)
-	 * @param version        Version of the previous relation
-	 * @return assigned id of this element
-	 * @throws Exception Thrown if the system cannot access the state.
+	 * @return an accessor to deal with this sharedBuffer.
 	 */
-	public NodeId put(
-			final String stateName,
-			final EventId eventId,
-			@Nullable final NodeId previousNodeId,
-			final DeweyNumber version) throws Exception {
+	public SharedBufferAccessor<V> getAccessor() {
+		return new SharedBufferAccessor<>(this);
+	}
 
-		if (previousNodeId != null) {
-			lockNode(previousNodeId);
+	void advanceTime(long timestamp) throws Exception {
+		Iterator<Long> iterator = eventsCount.keys().iterator();
+		while (iterator.hasNext()) {
+			Long next = iterator.next();
+			if (next < timestamp) {
+				iterator.remove();
+			}
 		}
+	}
 
-		NodeId currentNodeId = new NodeId(eventId, getOriginalNameFromInternal(stateName));
-		Lockable<SharedBufferNode> currentNode = entries.get(currentNodeId);
-		if (currentNode == null) {
-			currentNode = new Lockable<>(new SharedBufferNode(), 0);
-			lockEvent(eventId);
+	EventId registerEvent(V value, long timestamp) throws Exception {
+		Integer id = eventsCount.get(timestamp);
+		if (id == null) {
+			id = 0;
 		}
-
-		currentNode.getElement().addEdge(new SharedBufferEdge(
-			previousNodeId,
-			version));
-		entries.put(currentNodeId, currentNode);
-
-		return currentNodeId;
+		EventId eventId = new EventId(id, timestamp);
+		Lockable<V> lockableValue = new Lockable<>(value, 1);
+		eventsCount.put(timestamp, id + 1);
+		eventsBufferCache.put(eventId, lockableValue);
+		return eventId;
 	}
 
 	/**
@@ -204,227 +150,120 @@ public NodeId put(
 	 * @throws Exception Thrown if the system cannot access the state.
 	 */
 	public boolean isEmpty() throws Exception {
-		return Iterables.isEmpty(eventsBuffer.keys());
+		return Iterables.isEmpty(eventsBufferCache.keySet()) && Iterables.isEmpty(eventsBuffer.keys());
 	}
 
 	/**
-	 * Returns all elements from the previous relation starting at the given entry.
+	 * Inserts or updates an event in cache.
 	 *
-	 * @param nodeId  id of the starting entry
-	 * @param version Version of the previous relation which shall be extracted
-	 * @return Collection of previous relations starting with the given value
-	 * @throws Exception Thrown if the system cannot access the state.
+	 * @param eventId id of the event
+	 * @param event event body
 	 */
-	public List<Map<String, List<EventId>>> extractPatterns(
-			final NodeId nodeId,
-			final DeweyNumber version) throws Exception {
-
-		List<Map<String, List<EventId>>> result = new ArrayList<>();
-
-		// stack to remember the current extraction states
-		Stack<ExtractionState> extractionStates = new Stack<>();
-
-		// get the starting shared buffer entry for the previous relation
-		Lockable<SharedBufferNode> entryLock = entries.get(nodeId);
-
-		if (entryLock != null) {
-			SharedBufferNode entry = entryLock.getElement();
-			extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
-
-			// use a depth first search to reconstruct the previous relations
-			while (!extractionStates.isEmpty()) {
-				final ExtractionState extractionState = extractionStates.pop();
-				// current path of the depth first search
-				final Stack<Tuple2<NodeId, SharedBufferNode>> currentPath = extractionState.getPath();
-				final Tuple2<NodeId, SharedBufferNode> currentEntry = extractionState.getEntry();
-
-				// termination criterion
-				if (currentEntry == null) {
-					final Map<String, List<EventId>> completePath = new LinkedHashMap<>();
-
-					while (!currentPath.isEmpty()) {
-						final NodeId currentPathEntry = currentPath.pop().f0;
-
-						String page = currentPathEntry.getPageName();
-						List<EventId> values = completePath
-							.computeIfAbsent(page, k -> new ArrayList<>());
-						values.add(currentPathEntry.getEventId());
-					}
-					result.add(completePath);
-				} else {
-
-					// append state to the path
-					currentPath.push(currentEntry);
-
-					boolean firstMatch = true;
-					for (SharedBufferEdge edge : currentEntry.f1.getEdges()) {
-						// we can only proceed if the current version is compatible to the version
-						// of this previous relation
-						final DeweyNumber currentVersion = extractionState.getVersion();
-						if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
-							final NodeId target = edge.getTarget();
-							Stack<Tuple2<NodeId, SharedBufferNode>> newPath;
-
-							if (firstMatch) {
-								// for the first match we don't have to copy the current path
-								newPath = currentPath;
-								firstMatch = false;
-							} else {
-								newPath = new Stack<>();
-								newPath.addAll(currentPath);
-							}
-
-							extractionStates.push(new ExtractionState(
-								target != null ? Tuple2.of(target, entries.get(target).getElement()) : null,
-								edge.getDeweyNumber(),
-								newPath));
-						}
-					}
-				}
-
-			}
-		}
-		return result;
+	void upsertEvent(EventId eventId, Lockable<V> event) {
+		this.eventsBufferCache.put(eventId, event);
 	}
 
-	public Map<String, List<V>> materializeMatch(Map<String, List<EventId>> match) {
-		return materializeMatch(match, new HashMap<>());
+	/**
+	 * Inserts or updates a shareBufferNode in cache.
+	 *
+	 * @param nodeId id of the event
+	 * @param entry SharedBufferNode
+	 */
+	void upsertEntry(NodeId nodeId, Lockable<SharedBufferNode> entry) {
+		this.entryCache.put(nodeId, entry);
 	}
 
-	public Map<String, List<V>> materializeMatch(Map<String, List<EventId>> match, Map<EventId, V> cache) {
-
-		Map<String, List<V>> materializedMatch = new LinkedHashMap<>(match.size());
-
-		for (Map.Entry<String, List<EventId>> pattern : match.entrySet()) {
-			List<V> events = new ArrayList<>(pattern.getValue().size());
-			for (EventId eventId : pattern.getValue()) {
-				V event = cache.computeIfAbsent(eventId, id -> {
-					try {
-						return eventsBuffer.get(id).getElement();
-					} catch (Exception ex) {
-						throw new WrappingRuntimeException(ex);
-					}
-				});
-				events.add(event);
-			}
-			materializedMatch.put(pattern.getKey(), events);
-		}
-
-		return materializedMatch;
+	/**
+	 * Removes an event from cache and state.
+	 *
+	 * @param eventId id of the event
+	 */
+	void removeEvent(EventId eventId) throws Exception {
+		this.eventsBufferCache.remove(eventId);
+		this.eventsBuffer.remove(eventId);
 	}
 
 	/**
-	 * Increases the reference counter for the given entry so that it is not
-	 * accidentally removed.
+	 * Removes a ShareBufferNode from cache and state.
 	 *
-	 * @param node id of the entry
-	 * @throws Exception Thrown if the system cannot access the state.
+	 * @param nodeId id of the event
 	 */
-	public void lockNode(final NodeId node) throws Exception {
-		Lockable<SharedBufferNode> sharedBufferNode = entries.get(node);
-		if (sharedBufferNode != null) {
-			sharedBufferNode.lock();
-			entries.put(node, sharedBufferNode);
-		}
+	void removeEntry(NodeId nodeId) throws Exception {
+		this.entryCache.remove(nodeId);
+		this.entries.remove(nodeId);
 	}
 
 	/**
-	 * Decreases the reference counter for the given entry so that it can be
-	 * removed once the reference counter reaches 0.
+	 * It always returns node either from state or cache.
 	 *
-	 * @param node id of the entry
-	 * @throws Exception Thrown if the system cannot access the state.
+	 * @param nodeId id of the node
+	 * @return SharedBufferNode
 	 */
-	public void releaseNode(final NodeId node) throws Exception {
-		Lockable<SharedBufferNode> sharedBufferNode = entries.get(node);
-		if (sharedBufferNode != null) {
-			if (sharedBufferNode.release()) {
-				removeNode(node, sharedBufferNode.getElement());
-			} else {
-				entries.put(node, sharedBufferNode);
+	Lockable<SharedBufferNode> getEntry(NodeId nodeId) {
+		return entryCache.computeIfAbsent(nodeId, id -> {
+			try {
+				return entries.get(id);
+			} catch (Exception ex) {
+				throw new WrappingRuntimeException(ex);
 			}
-		}
-	}
-
-	private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception {
-		entries.remove(node);
-		EventId eventId = node.getEventId();
-		releaseEvent(eventId);
-
-		for (SharedBufferEdge sharedBufferEdge : sharedBufferNode.getEdges()) {
-			releaseNode(sharedBufferEdge.getTarget());
-		}
-	}
-
-	private void lockEvent(EventId eventId) throws Exception {
-		Lockable<V> eventWrapper = eventsBuffer.get(eventId);
-		checkState(
-			eventWrapper != null,
-			"Referring to non existent event with id %s",
-			eventId);
-		eventWrapper.lock();
-		eventsBuffer.put(eventId, eventWrapper);
+		});
 	}
 
 	/**
-	 * Decreases the reference counter for the given event so that it can be
-	 * removed once the reference counter reaches 0.
+	 * It always returns event either from state or cache.
 	 *
 	 * @param eventId id of the event
-	 * @throws Exception Thrown if the system cannot access the state.
+	 * @return event
 	 */
-	public void releaseEvent(EventId eventId) throws Exception {
-		Lockable<V> eventWrapper = eventsBuffer.get(eventId);
-		if (eventWrapper != null) {
-			if (eventWrapper.release()) {
-				eventsBuffer.remove(eventId);
-			} else {
-				eventsBuffer.put(eventId, eventWrapper);
+	Lockable<V> getEvent(EventId eventId) {
+		return eventsBufferCache.computeIfAbsent(eventId, id -> {
+			try {
+				return eventsBuffer.get(id);
+			} catch (Exception ex) {
+				throw new WrappingRuntimeException(ex);
 			}
-		}
+		});
 	}
 
 	/**
-	 * Helper class to store the extraction state while extracting a sequence of values following
-	 * the versioned entry edges.
+	 * Flush the event and node from cache to state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
 	 */
-	private static class ExtractionState {
-
-		private final Tuple2<NodeId, SharedBufferNode> entry;
-		private final DeweyNumber version;
-		private final Stack<Tuple2<NodeId, SharedBufferNode>> path;
-
-		ExtractionState(
-				final Tuple2<NodeId, SharedBufferNode> entry,
-				final DeweyNumber version,
-				final Stack<Tuple2<NodeId, SharedBufferNode>> path) {
-			this.entry = entry;
-			this.version = version;
-			this.path = path;
+	void flushCache() throws Exception {
+		if (!entryCache.isEmpty()) {
+			entries.putAll(entryCache);
+			entryCache.clear();
 		}
-
-		public Tuple2<NodeId, SharedBufferNode> getEntry() {
-			return entry;
+		if (!eventsBufferCache.isEmpty()) {
+			eventsBuffer.putAll(eventsBufferCache);
+			eventsBufferCache.clear();
 		}
+	}
 
-		public Stack<Tuple2<NodeId, SharedBufferNode>> getPath() {
-			return path;
-		}
+	@VisibleForTesting
+	Iterator<Map.Entry<Long, Integer>> getEventCounters() throws Exception {
+		return eventsCount.iterator();
+	}
 
-		public DeweyNumber getVersion() {
-			return version;
-		}
+	@VisibleForTesting
+	public int getEventsBufferCacheSize() {
+		return eventsBufferCache.size();
+	}
 
-		@Override
-		public String toString() {
-			return "ExtractionState(" + entry + ", " + version + ", [" +
-				StringUtils.join(path, ", ") + "])";
-		}
+	@VisibleForTesting
+	public int getEventsBufferSize() throws Exception {
+		return Iterables.size(eventsBuffer.entries());
 	}
 
 	@VisibleForTesting
-	Iterator<Map.Entry<Long, Integer>> getEventCounters() throws Exception {
-		return eventsCount.iterator();
+	public int getSharedBufferNodeSize() throws Exception {
+		return Iterables.size(entries.entries());
+	}
+
+	@VisibleForTesting
+	public int getSharedBufferNodeCacheSize() throws Exception {
+		return entryCache.size();
 	}
 
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
new file mode 100644
index 00000000000..2613f9dc1d1
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferAccessor.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.util.WrappingRuntimeException;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+
+import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Accessor to SharedBuffer that allows operations on the underlying structures in batches.
+ * Operations are persisted only after closing the Accessor.
+ */
+public class SharedBufferAccessor<V> implements AutoCloseable {
+
+	/** The sharedBuffer to store the partial matched events.*/
+	private SharedBuffer<V> sharedBuffer;
+
+	SharedBufferAccessor(SharedBuffer<V> sharedBuffer) {
+		this.sharedBuffer = sharedBuffer;
+	}
+
+	/**
+	 * Notifies shared buffer that there will be no events with timestamp &lt;&eq; the given value. It allows to clear
+	 * internal counters for number of events seen so far per timestamp.
+	 *
+	 * @param timestamp watermark, no earlier events will arrive
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public void advanceTime(long timestamp) throws Exception {
+		sharedBuffer.advanceTime(timestamp);
+	}
+
+	/**
+	 * Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a
+	 * lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed
+	 * after processing all {@link org.apache.flink.cep.nfa.ComputationState}s
+	 *
+	 * <p><b>NOTE:</b>Should be called only once for each unique event!
+	 *
+	 * @param value event to be registered
+	 * @return unique id of that event that should be used when putting entries to the buffer.
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public EventId registerEvent(V value, long timestamp) throws Exception {
+		return sharedBuffer.registerEvent(value, timestamp);
+	}
+
+	/**
+	 * Stores given value (value + timestamp) under the given state. It assigns a preceding element
+	 * relation to the previous entry.
+	 *
+	 * @param stateName      name of the state that the event should be assigned to
+	 * @param eventId        unique id of event assigned by this SharedBuffer
+	 * @param previousNodeId id of previous entry (might be null if start of new run)
+	 * @param version        Version of the previous relation
+	 * @return assigned id of this element
+	 */
+	public NodeId put(
+		final String stateName,
+		final EventId eventId,
+		@Nullable final NodeId previousNodeId,
+		final DeweyNumber version) {
+
+		if (previousNodeId != null) {
+			lockNode(previousNodeId);
+		}
+
+		NodeId currentNodeId = new NodeId(eventId, getOriginalNameFromInternal(stateName));
+		Lockable<SharedBufferNode> currentNode = sharedBuffer.getEntry(currentNodeId);
+		if (currentNode == null) {
+			currentNode = new Lockable<>(new SharedBufferNode(), 0);
+			lockEvent(eventId);
+		}
+
+		currentNode.getElement().addEdge(new SharedBufferEdge(
+			previousNodeId,
+			version));
+		sharedBuffer.upsertEntry(currentNodeId, currentNode);
+
+		return currentNodeId;
+	}
+
+	/**
+	 * Returns all elements from the previous relation starting at the given entry.
+	 *
+	 * @param nodeId  id of the starting entry
+	 * @param version Version of the previous relation which shall be extracted
+	 * @return Collection of previous relations starting with the given value
+	 */
+	public List<Map<String, List<EventId>>> extractPatterns(
+		final NodeId nodeId,
+		final DeweyNumber version) {
+
+		List<Map<String, List<EventId>>> result = new ArrayList<>();
+
+		// stack to remember the current extraction states
+		Stack<SharedBufferAccessor.ExtractionState> extractionStates = new Stack<>();
+
+		// get the starting shared buffer entry for the previous relation
+		Lockable<SharedBufferNode> entryLock = sharedBuffer.getEntry(nodeId);
+
+		if (entryLock != null) {
+			SharedBufferNode entry = entryLock.getElement();
+			extractionStates.add(new SharedBufferAccessor.ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
+
+			// use a depth first search to reconstruct the previous relations
+			while (!extractionStates.isEmpty()) {
+				final SharedBufferAccessor.ExtractionState extractionState = extractionStates.pop();
+				// current path of the depth first search
+				final Stack<Tuple2<NodeId, SharedBufferNode>> currentPath = extractionState.getPath();
+				final Tuple2<NodeId, SharedBufferNode> currentEntry = extractionState.getEntry();
+
+				// termination criterion
+				if (currentEntry == null) {
+					final Map<String, List<EventId>> completePath = new LinkedHashMap<>();
+
+					while (!currentPath.isEmpty()) {
+						final NodeId currentPathEntry = currentPath.pop().f0;
+
+						String page = currentPathEntry.getPageName();
+						List<EventId> values = completePath
+							.computeIfAbsent(page, k -> new ArrayList<>());
+						values.add(currentPathEntry.getEventId());
+					}
+					result.add(completePath);
+				} else {
+
+					// append state to the path
+					currentPath.push(currentEntry);
+
+					boolean firstMatch = true;
+					for (SharedBufferEdge edge : currentEntry.f1.getEdges()) {
+						// we can only proceed if the current version is compatible to the version
+						// of this previous relation
+						final DeweyNumber currentVersion = extractionState.getVersion();
+						if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
+							final NodeId target = edge.getTarget();
+							Stack<Tuple2<NodeId, SharedBufferNode>> newPath;
+
+							if (firstMatch) {
+								// for the first match we don't have to copy the current path
+								newPath = currentPath;
+								firstMatch = false;
+							} else {
+								newPath = new Stack<>();
+								newPath.addAll(currentPath);
+							}
+
+							extractionStates.push(new SharedBufferAccessor.ExtractionState(
+								target != null ? Tuple2.of(target, sharedBuffer.getEntry(target).getElement()) : null,
+								edge.getDeweyNumber(),
+								newPath));
+						}
+					}
+				}
+
+			}
+		}
+		return result;
+	}
+
+	/**
+	 * Extracts the real event from the sharedBuffer with pre-extracted eventId.
+	 *
+	 * @param match the matched event's eventId.
+	 * @return the event associated with the eventId.
+	 */
+	public Map<String, List<V>> materializeMatch(Map<String, List<EventId>> match) {
+		Map<String, List<V>> materializedMatch = new LinkedHashMap<>(match.size());
+
+		for (Map.Entry<String, List<EventId>> pattern : match.entrySet()) {
+			List<V> events = new ArrayList<>(pattern.getValue().size());
+			for (EventId eventId : pattern.getValue()) {
+				try {
+					V event = sharedBuffer.getEvent(eventId).getElement();
+					events.add(event);
+				} catch (Exception ex) {
+					throw new WrappingRuntimeException(ex);
+				}
+			}
+			materializedMatch.put(pattern.getKey(), events);
+		}
+
+		return materializedMatch;
+	}
+
+	/**
+	 * Increases the reference counter for the given entry so that it is not
+	 * accidentally removed.
+	 *
+	 * @param node id of the entry
+	 */
+	public void lockNode(final NodeId node) {
+		Lockable<SharedBufferNode> sharedBufferNode = sharedBuffer.getEntry(node);
+		if (sharedBufferNode != null) {
+			sharedBufferNode.lock();
+			sharedBuffer.upsertEntry(node, sharedBufferNode);
+		}
+	}
+
+	/**
+	 * Decreases the reference counter for the given entry so that it can be
+	 * removed once the reference counter reaches 0.
+	 *
+	 * @param node id of the entry
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public void releaseNode(final NodeId node) throws Exception {
+		Lockable<SharedBufferNode> sharedBufferNode = sharedBuffer.getEntry(node);
+		if (sharedBufferNode != null) {
+			if (sharedBufferNode.release()) {
+				removeNode(node, sharedBufferNode.getElement());
+			} else {
+				sharedBuffer.upsertEntry(node, sharedBufferNode);
+			}
+		}
+	}
+
+	/**
+	 * Removes the {@code SharedBufferNode}, when the ref is decreased to zero, and also
+	 * decrease the ref of the edge on this node.
+	 *
+	 * @param node id of the entry
+	 * @param sharedBufferNode the node body to be removed
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception {
+		sharedBuffer.removeEntry(node);
+		EventId eventId = node.getEventId();
+		releaseEvent(eventId);
+
+		for (SharedBufferEdge sharedBufferEdge : sharedBufferNode.getEdges()) {
+			releaseNode(sharedBufferEdge.getTarget());
+		}
+	}
+
+	/**
+	 * Increases the reference counter for the given event so that it is not
+	 * accidentally removed.
+	 *
+	 * @param eventId id of the entry
+	 */
+	private void lockEvent(EventId eventId) {
+		Lockable<V> eventWrapper = sharedBuffer.getEvent(eventId);
+		checkState(
+			eventWrapper != null,
+			"Referring to non existent event with id %s",
+			eventId);
+		eventWrapper.lock();
+		sharedBuffer.upsertEvent(eventId, eventWrapper);
+	}
+
+	/**
+	 * Decreases the reference counter for the given event so that it can be
+	 * removed once the reference counter reaches 0.
+	 *
+	 * @param eventId id of the event
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public void releaseEvent(EventId eventId) throws Exception {
+		Lockable<V> eventWrapper = sharedBuffer.getEvent(eventId);
+		if (eventWrapper != null) {
+			if (eventWrapper.release()) {
+				sharedBuffer.removeEvent(eventId);
+			} else {
+				sharedBuffer.upsertEvent(eventId, eventWrapper);
+			}
+		}
+	}
+
+	/**
+	 * Persists the entry in the cache to the underlay state.
+	 *
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public void close() throws Exception {
+		sharedBuffer.flushCache();
+	}
+
+	/**
+	 * Helper class to store the extraction state while extracting a sequence of values following
+	 * the versioned entry edges.
+	 */
+	private static class ExtractionState {
+
+		private final Tuple2<NodeId, SharedBufferNode> entry;
+		private final DeweyNumber version;
+		private final Stack<Tuple2<NodeId, SharedBufferNode>> path;
+
+		ExtractionState(
+			final Tuple2<NodeId, SharedBufferNode> entry,
+			final DeweyNumber version,
+			final Stack<Tuple2<NodeId, SharedBufferNode>> path) {
+			this.entry = entry;
+			this.version = version;
+			this.path = path;
+		}
+
+		public Tuple2<NodeId, SharedBufferNode> getEntry() {
+			return entry;
+		}
+
+		public Stack<Tuple2<NodeId, SharedBufferNode>> getPath() {
+			return path;
+		}
+
+		public DeweyNumber getVersion() {
+			return version;
+		}
+
+		@Override
+		public String toString() {
+			return "ExtractionState(" + entry + ", " + version + ", [" +
+				StringUtils.join(path, ", ") + "])";
+		}
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 9c263b08b53..b57c3fe0b2f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -36,6 +36,7 @@
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
@@ -375,9 +376,11 @@ private void updateNFA(NFAState nfaState) throws IOException {
 	 * @param timestamp The timestamp of the event
 	 */
 	private void processEvent(NFAState nfaState, IN event, long timestamp) throws Exception {
-		Collection<Map<String, List<IN>>> patterns =
-				nfa.process(partialMatches, nfaState, event, timestamp, afterMatchSkipStrategy);
-		processMatchedSequences(patterns, timestamp);
+		try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
+			Collection<Map<String, List<IN>>> patterns =
+				nfa.process(sharedBufferAccessor, nfaState, event, timestamp, afterMatchSkipStrategy);
+			processMatchedSequences(patterns, timestamp);
+		}
 	}
 
 	/**
@@ -385,9 +388,11 @@ private void processEvent(NFAState nfaState, IN event, long timestamp) throws Ex
 	 * <b>lower</b> than the given timestamp should be passed to the nfa, This can lead to pruning and timeouts.
 	 */
 	private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
-		Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
-			nfa.advanceTime(partialMatches, nfaState, timestamp);
-		processTimedOutSequences(timedOut, timestamp);
+		try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
+			Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
+				nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
+			processTimedOutSequences(timedOut, timestamp);
+		}
 	}
 
 	protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception;
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index b6151144139..6ffd5d2de62 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -22,6 +22,7 @@
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
@@ -32,6 +33,7 @@
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -60,10 +62,17 @@
 public class NFAITCase extends TestLogger {
 
 	private SharedBuffer<Event> sharedBuffer;
+	private SharedBufferAccessor<Event> sharedBufferAccessor;
 
 	@Before
 	public void init() {
 		sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		sharedBufferAccessor = sharedBuffer.getAccessor();
+	}
+
+	@After
+	public void clear() throws Exception{
+		sharedBufferAccessor.close();
 	}
 
 	@Test
@@ -403,9 +412,9 @@ public boolean filter(Event value) throws Exception {
 		for (StreamRecord<Event> event: events) {
 
 			Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutPatterns =
-				nfa.advanceTime(sharedBuffer, nfaState, event.getTimestamp());
+				nfa.advanceTime(sharedBufferAccessor, nfaState, event.getTimestamp());
 			Collection<Map<String, List<Event>>> matchedPatterns =
-				nfa.process(sharedBuffer, nfaState, event.getValue(), event.getTimestamp());
+				nfa.process(sharedBufferAccessor, nfaState, event.getValue(), event.getTimestamp());
 
 			resultingPatterns.addAll(matchedPatterns);
 			resultingTimeoutPatterns.addAll(timeoutPatterns);
@@ -2333,14 +2342,14 @@ public boolean filter(Event value) throws Exception {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBuffer, nfaState, startEvent, 1);
-		nfa.process(sharedBuffer, nfaState, middleEvent1, 2);
-		nfa.process(sharedBuffer, nfaState, middleEvent2, 3);
-		nfa.process(sharedBuffer, nfaState, middleEvent3, 4);
-		nfa.process(sharedBuffer, nfaState, end1, 6);
+		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
+		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 2);
+		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 3);
+		nfa.process(sharedBufferAccessor, nfaState, middleEvent3, 4);
+		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
 
 		//pruning element
-		nfa.advanceTime(sharedBuffer, nfaState, 10);
+		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
 
 		assertEquals(1, nfaState.getPartialMatches().size());
 		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
@@ -2379,12 +2388,12 @@ public boolean filter(Event value) throws Exception {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBuffer, nfaState, startEvent, 1);
-		nfa.process(sharedBuffer, nfaState, middleEvent, 5);
-		nfa.process(sharedBuffer, nfaState, end1, 6);
+		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
+		nfa.process(sharedBufferAccessor, nfaState, middleEvent, 5);
+		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
 
 		//pruning element
-		nfa.advanceTime(sharedBuffer, nfaState, 10);
+		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
 
 		assertEquals(1, nfaState.getPartialMatches().size());
 		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
@@ -2424,13 +2433,13 @@ public boolean filter(Event value) throws Exception {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBuffer, nfaState, startEvent, 1);
-		nfa.process(sharedBuffer, nfaState, middleEvent1, 3);
-		nfa.process(sharedBuffer, nfaState, middleEvent2, 4);
-		nfa.process(sharedBuffer, nfaState, end1, 6);
+		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
+		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
+		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
+		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
 
 		//pruning element
-		nfa.advanceTime(sharedBuffer, nfaState, 10);
+		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
 
 		assertEquals(1, nfaState.getPartialMatches().size());
 		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
@@ -2470,13 +2479,13 @@ public boolean filter(Event value) throws Exception {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBuffer, nfaState, startEvent, 1);
-		nfa.process(sharedBuffer, nfaState, middleEvent1, 3);
-		nfa.process(sharedBuffer, nfaState, middleEvent2, 4);
-		nfa.process(sharedBuffer, nfaState, end1, 6);
+		nfa.process(sharedBufferAccessor, nfaState, startEvent, 1);
+		nfa.process(sharedBufferAccessor, nfaState, middleEvent1, 3);
+		nfa.process(sharedBufferAccessor, nfaState, middleEvent2, 4);
+		nfa.process(sharedBufferAccessor, nfaState, end1, 6);
 
 		//pruning element
-		nfa.advanceTime(sharedBuffer, nfaState, 10);
+		nfa.advanceTime(sharedBufferAccessor, nfaState, 10);
 
 		assertEquals(1, nfaState.getPartialMatches().size());
 		assertEquals("start", nfaState.getPartialMatches().peek().getCurrentStateName());
@@ -2733,7 +2742,7 @@ public boolean filter(Event s) throws Exception {
 
 		for (StreamRecord<Event> inputEvent : inputEvents) {
 			Collection<Map<String, List<Event>>> patterns = nfa.process(
-				sharedBuffer,
+				sharedBufferAccessor,
 				nfaState,
 				inputEvent.getValue(),
 				inputEvent.getTimestamp());
@@ -2808,7 +2817,7 @@ public boolean filter(Event s) throws Exception {
 
 		for (StreamRecord<Event> inputEvent : inputEvents) {
 			Collection<Map<String, List<Event>>> patterns = nfa.process(
-				sharedBuffer,
+				sharedBufferAccessor,
 				nfaState,
 				inputEvent.getValue(),
 				inputEvent.getTimestamp());
@@ -2836,14 +2845,13 @@ public void testSharedBufferClearing() throws Exception {
 		Event a = new Event(40, "a", 1.0);
 		Event b = new Event(41, "b", 2.0);
 
-		SharedBuffer<Event> spiedBuffer = Mockito.spy(sharedBuffer);
-		NFA<Event> nfa = compile(pattern, false);
-
-		nfa.process(spiedBuffer, nfa.createInitialNFAState(), a, 1);
-		nfa.process(spiedBuffer, nfa.createInitialNFAState(), b, 2);
-		Mockito.verify(spiedBuffer, Mockito.never()).advanceTime(anyLong());
-		nfa.advanceTime(spiedBuffer, nfa.createInitialNFAState(), 2);
-		Mockito.verify(spiedBuffer, Mockito.times(1)).advanceTime(2);
-
+		try (SharedBufferAccessor<Event> accessor = Mockito.spy(sharedBuffer.getAccessor())) {
+			NFA<Event> nfa = compile(pattern, false);
+			nfa.process(accessor, nfa.createInitialNFAState(), a, 1);
+			nfa.process(accessor, nfa.createInitialNFAState(), b, 2);
+			Mockito.verify(accessor, Mockito.never()).advanceTime(anyLong());
+			nfa.advanceTime(accessor, nfa.createInitialNFAState(), 2);
+			Mockito.verify(accessor, Mockito.times(1)).advanceTime(2);
+		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
index ede4b23d0d4..227fe4e9507 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStateAccessTest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
@@ -102,16 +103,18 @@ public boolean filter(Event value) throws Exception {
 
 		TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
 		for (StreamRecord<Event> inputEvent : inputEvents) {
-			nfa.process(
-				sharedBuffer,
-				nfa.createInitialNFAState(),
-				inputEvent.getValue(),
-				inputEvent.getTimestamp());
+			try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
+					nfa.process(
+					accessor,
+					nfa.createInitialNFAState(),
+					inputEvent.getValue(),
+					inputEvent.getTimestamp());
+			}
 		}
 
-		assertEquals(5, sharedBuffer.getStateReads());
-		assertEquals(6, sharedBuffer.getStateWrites());
-		assertEquals(11, sharedBuffer.getStateAccesses());
+		assertEquals(2, sharedBuffer.getStateReads());
+		assertEquals(3, sharedBuffer.getStateWrites());
+		assertEquals(5, sharedBuffer.getStateAccesses());
 	}
 
 	@Test
@@ -183,15 +186,17 @@ public boolean filter(Event value) throws Exception {
 
 		TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
 		for (StreamRecord<Event> inputEvent : inputEvents) {
-			nfa.process(
-				sharedBuffer,
-				nfa.createInitialNFAState(),
-				inputEvent.getValue(),
-				inputEvent.getTimestamp());
+			try (SharedBufferAccessor<Event> accessor = sharedBuffer.getAccessor()) {
+					nfa.process(
+					accessor,
+					nfa.createInitialNFAState(),
+					inputEvent.getValue(),
+					inputEvent.getTimestamp());
+			}
 		}
 
-		assertEquals(20, sharedBuffer.getStateReads());
-		assertEquals(24, sharedBuffer.getStateWrites());
-		assertEquals(44, sharedBuffer.getStateAccesses());
+		assertEquals(8, sharedBuffer.getStateReads());
+		assertEquals(12, sharedBuffer.getStateWrites());
+		assertEquals(20, sharedBuffer.getStateAccesses());
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
index ea20bee25db..8cbfba1ec6c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAStatusChangeITCase.java
@@ -21,12 +21,14 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -44,10 +46,17 @@
 public class NFAStatusChangeITCase {
 
 	private SharedBuffer<Event> sharedBuffer;
+	private SharedBufferAccessor<Event> sharedBufferAccessor;
 
 	@Before
 	public void init() {
 		this.sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		sharedBufferAccessor = sharedBuffer.getAccessor();
+	}
+
+	@After
+	public void clear() throws Exception{
+		sharedBufferAccessor.close();
 	}
 
 	@Test
@@ -86,44 +95,44 @@ public boolean filter(Event value, Context<Event> ctx) throws Exception {
 
 		NFAState nfaState = nfa.createInitialNFAState();
 
-		nfa.process(sharedBuffer, nfaState, new Event(1, "b", 1.0), 1L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(1, "b", 1.0), 1L);
 		assertFalse("NFA status should not change as the event does not match the take condition of the 'start' state", nfaState.isStateChanged());
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, new Event(2, "a", 1.0), 2L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(2, "a", 1.0), 2L);
 		assertTrue("NFA status should change as the event matches the take condition of the 'start' state", nfaState.isStateChanged());
 
 		// the status of the queue of ComputationStatus changed,
 		// more than one ComputationStatus is generated by the event from some ComputationStatus
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, new Event(3, "f", 1.0), 3L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(3, "f", 1.0), 3L);
 		assertTrue("NFA status should change as the event matches the ignore condition and proceed condition of the 'middle:1' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, new Event(4, "f", 1.0), 4L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(4, "f", 1.0), 4L);
 		assertFalse("NFA status should not change as the event only matches the ignore condition of the 'middle:2' state and the target state is still 'middle:2'", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, new Event(5, "b", 1.0), 5L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(5, "b", 1.0), 5L);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle:2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have changed
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, new Event(6, "d", 1.0), 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "d", 1.0), 6L);
 		assertTrue("NFA status should change as the event matches the take condition of 'middle2' state", nfaState.isStateChanged());
 
 		// both the queue of ComputationStatus and eventSharedBuffer have not changed
 		// as the timestamp is within the window
 		nfaState.resetStateChanged();
-		nfa.advanceTime(sharedBuffer, nfaState, 8L);
+		nfa.advanceTime(sharedBufferAccessor, nfaState, 8L);
 		assertFalse("NFA status should not change as the timestamp is within the window", nfaState.isStateChanged());
 
 		// timeout ComputationStatus will be removed from the queue of ComputationStatus and timeout event will
 		// be removed from eventSharedBuffer as the timeout happens
 		nfaState.resetStateChanged();
-		Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.advanceTime(sharedBuffer, nfaState, 12L);
+		Collection<Tuple2<Map<String, List<Event>>, Long>> timeoutResults = nfa.advanceTime(sharedBufferAccessor, nfaState, 12L);
 		assertTrue("NFA status should change as timeout happens", nfaState.isStateChanged() && !timeoutResults.isEmpty());
 	}
 
@@ -155,10 +164,10 @@ public boolean filter(Event value, Context<Event> ctx) throws Exception {
 		NFAState nfaState = nfa.createInitialNFAState();
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, new Event(6, "start", 1.0), 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
 
 		nfaState.resetStateChanged();
-		nfa.process(sharedBuffer, nfaState, new Event(6, "a", 1.0), 7L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "a", 1.0), 7L);
 		assertTrue(nfaState.isStateChanged());
 	}
 
@@ -183,11 +192,11 @@ public boolean filter(Event value, Context<Event> ctx) throws Exception {
 		NFAState nfaState = nfa.createInitialNFAState();
 
 		nfaState.resetStateChanged();
-		nfa.advanceTime(sharedBuffer, nfaState, 6L);
-		nfa.process(sharedBuffer, nfaState, new Event(6, "start", 1.0), 6L);
+		nfa.advanceTime(sharedBufferAccessor, nfaState, 6L);
+		nfa.process(sharedBufferAccessor, nfaState, new Event(6, "start", 1.0), 6L);
 
 		nfaState.resetStateChanged();
-		nfa.advanceTime(sharedBuffer, nfaState, 17L);
+		nfa.advanceTime(sharedBufferAccessor, nfaState, 17L);
 		assertTrue(nfaState.isStateChanged());
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index e626b50f49d..e5f4c9ee9f6 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -20,6 +20,7 @@
 
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
@@ -184,15 +185,17 @@ public void testTimeoutWindowPruningWindowBorders() throws Exception {
 		Set<Map<String, List<Event>>> actualPatterns = new HashSet<>();
 
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		for (StreamRecord<Event> streamEvent : inputs) {
-			nfa.advanceTime(sharedBuffer, nfaState, streamEvent.getTimestamp());
-			Collection<Map<String, List<Event>>> matchedPatterns = nfa.process(
-				sharedBuffer,
-				nfaState,
-				streamEvent.getValue(),
-				streamEvent.getTimestamp());
-
-			actualPatterns.addAll(matchedPatterns);
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+			for (StreamRecord<Event> streamEvent : inputs) {
+				nfa.advanceTime(sharedBufferAccessor, nfaState, streamEvent.getTimestamp());
+				Collection<Map<String, List<Event>>> matchedPatterns = nfa.process(
+					sharedBufferAccessor,
+					nfaState,
+					streamEvent.getValue(),
+					streamEvent.getTimestamp());
+
+				actualPatterns.addAll(matchedPatterns);
+			}
 		}
 
 		return actualPatterns;
@@ -287,48 +290,50 @@ public boolean filter(Event value) throws Exception {
 		patterns.add(pattern3);
 
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
-		for (Pattern<Event, ?> p : patterns) {
-			NFA<Event> nfa = compile(p, false);
-
-			Event a = new Event(40, "a", 1.0);
-			Event b = new Event(41, "b", 2.0);
-			Event c = new Event(42, "c", 3.0);
-			Event b1 = new Event(41, "b", 3.0);
-			Event b2 = new Event(41, "b", 4.0);
-			Event b3 = new Event(41, "b", 5.0);
-			Event d = new Event(43, "d", 4.0);
-
-			NFAState nfaState = nfa.createInitialNFAState();
-
-			nfa.process(sharedBuffer, nfaState, a, 1);
-			nfa.process(sharedBuffer, nfaState, b, 2);
-			nfa.process(sharedBuffer, nfaState, c, 3);
-			nfa.process(sharedBuffer, nfaState, b1, 4);
-			nfa.process(sharedBuffer, nfaState, b2, 5);
-			nfa.process(sharedBuffer, nfaState, b3, 6);
-			nfa.process(sharedBuffer, nfaState, d, 7);
-			nfa.process(sharedBuffer, nfaState, a, 8);
-
-			NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
-
-			//serialize
-			ByteArrayOutputStream baos = new ByteArrayOutputStream();
-			serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
-			baos.close();
-
-			// copy
-			ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
-			ByteArrayOutputStream out = new ByteArrayOutputStream();
-			serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
-			in.close();
-			out.close();
-
-			// deserialize
-			ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
-			NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
-			bais.close();
-
-			assertEquals(nfaState, copy);
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+
+			for (Pattern<Event, ?> p : patterns) {
+				NFA<Event> nfa = compile(p, false);
+
+				Event a = new Event(40, "a", 1.0);
+				Event b = new Event(41, "b", 2.0);
+				Event c = new Event(42, "c", 3.0);
+				Event b1 = new Event(41, "b", 3.0);
+				Event b2 = new Event(41, "b", 4.0);
+				Event b3 = new Event(41, "b", 5.0);
+				Event d = new Event(43, "d", 4.0);
+
+				NFAState nfaState = nfa.createInitialNFAState();
+
+				nfa.process(sharedBufferAccessor, nfaState, a, 1);
+				nfa.process(sharedBufferAccessor, nfaState, b, 2);
+				nfa.process(sharedBufferAccessor, nfaState, c, 3);
+				nfa.process(sharedBufferAccessor, nfaState, b1, 4);
+				nfa.process(sharedBufferAccessor, nfaState, b2, 5);
+				nfa.process(sharedBufferAccessor, nfaState, b3, 6);
+				nfa.process(sharedBufferAccessor, nfaState, d, 7);
+				nfa.process(sharedBufferAccessor, nfaState, a, 8);
+
+				NFAStateSerializer serializer = NFAStateSerializer.INSTANCE;
+
+				//serialize
+				ByteArrayOutputStream baos = new ByteArrayOutputStream();
+				serializer.serialize(nfaState, new DataOutputViewStreamWrapper(baos));
+				baos.close();
+
+				// copy
+				ByteArrayInputStream in = new ByteArrayInputStream(baos.toByteArray());
+				ByteArrayOutputStream out = new ByteArrayOutputStream();
+				serializer.duplicate().copy(new DataInputViewStreamWrapper(in), new DataOutputViewStreamWrapper(out));
+				in.close();
+				out.close();
+
+				// deserialize
+				ByteArrayInputStream bais = new ByteArrayInputStream(out.toByteArray());
+				NFAState copy = serializer.duplicate().deserialize(new DataInputViewStreamWrapper(bais));
+				bais.close();
+				assertEquals(nfaState, copy);
+			}
 		}
 	}
 
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
index 805c1421ed8..91e490ea6e9 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -21,6 +21,7 @@
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.utils.TestSharedBuffer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -66,21 +67,23 @@
 		List<List<Event>> resultingPatterns = new ArrayList<>();
 
 		SharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+
 		for (StreamRecord<Event> inputEvent : inputEvents) {
-			nfa.advanceTime(sharedBuffer, nfaState, inputEvent.getTimestamp());
-			Collection<Map<String, List<Event>>> patterns = nfa.process(
-				sharedBuffer,
-				nfaState,
-				inputEvent.getValue(),
-				inputEvent.getTimestamp(),
-				afterMatchSkipStrategy);
-
-			for (Map<String, List<Event>> p: patterns) {
-				List<Event> res = new ArrayList<>();
-				for (List<Event> le: p.values()) {
-					res.addAll(le);
+			try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+				nfa.advanceTime(sharedBufferAccessor, nfaState, inputEvent.getTimestamp());
+				Collection<Map<String, List<Event>>> patterns = nfa.process(
+					sharedBufferAccessor,
+					nfaState,
+					inputEvent.getValue(),
+					inputEvent.getTimestamp(),
+					afterMatchSkipStrategy);
+				for (Map<String, List<Event>> p: patterns) {
+					List<Event> res = new ArrayList<>();
+					for (List<Event> le: p.values()) {
+						res.addAll(le);
+					}
+					resultingPatterns.add(res);
 				}
-				resultingPatterns.add(res);
 			}
 		}
 		return resultingPatterns;
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
index e2673a946f3..0583e8b6e94 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferTest.java
@@ -92,44 +92,46 @@ public void testSharedBuffer() throws Exception {
 		expectedPattern3.put("b", new ArrayList<>());
 		expectedPattern3.get("b").add(events[7]);
 
-		NodeId a10 = sharedBuffer.put("a1", eventIds[0], null, DeweyNumber.fromString("1"));
-		NodeId aLoop0 = sharedBuffer.put("a[]", eventIds[1], a10, DeweyNumber.fromString("1.0"));
-		NodeId a11 = sharedBuffer.put("a1", eventIds[2], null, DeweyNumber.fromString("2"));
-		NodeId aLoop1 = sharedBuffer.put("a[]", eventIds[2], aLoop0, DeweyNumber.fromString("1.0"));
-		NodeId aLoop2 = sharedBuffer.put("a[]", eventIds[3], aLoop1, DeweyNumber.fromString("1.0"));
-		NodeId aSecondLoop0 = sharedBuffer.put("a[]", eventIds[3], a11, DeweyNumber.fromString("2.0"));
-		NodeId aLoop3 = sharedBuffer.put("a[]", eventIds[4], aLoop2, DeweyNumber.fromString("1.0"));
-		NodeId b0 = sharedBuffer.put("b", eventIds[5], aLoop3, DeweyNumber.fromString("1.0.0"));
-		NodeId aLoop4 = sharedBuffer.put("a[]", eventIds[5], aLoop3, DeweyNumber.fromString("1.1"));
-		NodeId b1 = sharedBuffer.put("b", eventIds[5], aSecondLoop0, DeweyNumber.fromString("2.0.0"));
-		NodeId aLoop5 = sharedBuffer.put("a[]", eventIds[6], aLoop4, DeweyNumber.fromString("1.1"));
-		NodeId b3 = sharedBuffer.put("b", eventIds[7], aLoop5, DeweyNumber.fromString("1.1.0"));
-
-		List<Map<String, List<EventId>>> patterns3 = sharedBuffer.extractPatterns(b3,
-			DeweyNumber.fromString("1.1.0"));
-		assertEquals(1L, patterns3.size());
-		assertEquals(expectedPattern3, sharedBuffer.materializeMatch(patterns3.get(0)));
-		sharedBuffer.releaseNode(b3);
-
-		List<Map<String, List<EventId>>> patterns4 = sharedBuffer.extractPatterns(b3,
-			DeweyNumber.fromString("1.1.0"));
-		assertEquals(0L, patterns4.size());
-		assertTrue(patterns4.isEmpty());
-
-		List<Map<String, List<EventId>>> patterns1 = sharedBuffer.extractPatterns(b1,
-			DeweyNumber.fromString("2.0.0"));
-		assertEquals(1L, patterns1.size());
-		assertEquals(expectedPattern1, sharedBuffer.materializeMatch(patterns1.get(0)));
-
-		List<Map<String, List<EventId>>> patterns2 = sharedBuffer.extractPatterns(b0,
-			DeweyNumber.fromString("1.0.0"));
-		assertEquals(1L, patterns2.size());
-		assertEquals(expectedPattern2, sharedBuffer.materializeMatch(patterns2.get(0)));
-		sharedBuffer.releaseNode(b1);
-		sharedBuffer.releaseNode(b0);
-
-		for (EventId eventId : eventIds) {
-			sharedBuffer.releaseEvent(eventId);
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+			NodeId a10 = sharedBufferAccessor.put("a1", eventIds[0], null, DeweyNumber.fromString("1"));
+			NodeId aLoop0 = sharedBufferAccessor.put("a[]", eventIds[1], a10, DeweyNumber.fromString("1.0"));
+			NodeId a11 = sharedBufferAccessor.put("a1", eventIds[2], null, DeweyNumber.fromString("2"));
+			NodeId aLoop1 = sharedBufferAccessor.put("a[]", eventIds[2], aLoop0, DeweyNumber.fromString("1.0"));
+			NodeId aLoop2 = sharedBufferAccessor.put("a[]", eventIds[3], aLoop1, DeweyNumber.fromString("1.0"));
+			NodeId aSecondLoop0 = sharedBufferAccessor.put("a[]", eventIds[3], a11, DeweyNumber.fromString("2.0"));
+			NodeId aLoop3 = sharedBufferAccessor.put("a[]", eventIds[4], aLoop2, DeweyNumber.fromString("1.0"));
+			NodeId b0 = sharedBufferAccessor.put("b", eventIds[5], aLoop3, DeweyNumber.fromString("1.0.0"));
+			NodeId aLoop4 = sharedBufferAccessor.put("a[]", eventIds[5], aLoop3, DeweyNumber.fromString("1.1"));
+			NodeId b1 = sharedBufferAccessor.put("b", eventIds[5], aSecondLoop0, DeweyNumber.fromString("2.0.0"));
+			NodeId aLoop5 = sharedBufferAccessor.put("a[]", eventIds[6], aLoop4, DeweyNumber.fromString("1.1"));
+			NodeId b3 = sharedBufferAccessor.put("b", eventIds[7], aLoop5, DeweyNumber.fromString("1.1.0"));
+
+			List<Map<String, List<EventId>>> patterns3 = sharedBufferAccessor.extractPatterns(b3,
+				DeweyNumber.fromString("1.1.0"));
+			assertEquals(1L, patterns3.size());
+			assertEquals(expectedPattern3, sharedBufferAccessor.materializeMatch(patterns3.get(0)));
+			sharedBufferAccessor.releaseNode(b3);
+
+			List<Map<String, List<EventId>>> patterns4 = sharedBufferAccessor.extractPatterns(b3,
+				DeweyNumber.fromString("1.1.0"));
+			assertEquals(0L, patterns4.size());
+			assertTrue(patterns4.isEmpty());
+
+			List<Map<String, List<EventId>>> patterns1 = sharedBufferAccessor.extractPatterns(b1,
+				DeweyNumber.fromString("2.0.0"));
+			assertEquals(1L, patterns1.size());
+			assertEquals(expectedPattern1, sharedBufferAccessor.materializeMatch(patterns1.get(0)));
+
+			List<Map<String, List<EventId>>> patterns2 = sharedBufferAccessor.extractPatterns(b0,
+				DeweyNumber.fromString("1.0.0"));
+			assertEquals(1L, patterns2.size());
+			assertEquals(expectedPattern2, sharedBufferAccessor.materializeMatch(patterns2.get(0)));
+			sharedBufferAccessor.releaseNode(b1);
+			sharedBufferAccessor.releaseNode(b0);
+
+			for (EventId eventId : eventIds) {
+				sharedBufferAccessor.releaseEvent(eventId);
+			}
 		}
 
 		assertTrue(sharedBuffer.isEmpty());
@@ -148,20 +150,22 @@ public void testClearingSharedBufferWithMultipleEdgesBetweenEntries() throws Exc
 			eventIds[i] = sharedBuffer.registerEvent(events[i], timestamp);
 		}
 
-		NodeId start = sharedBuffer.put("start", eventIds[1], null, DeweyNumber.fromString("1"));
-		NodeId b0 = sharedBuffer.put("branching", eventIds[2], start, DeweyNumber.fromString("1.0"));
-		NodeId b1 = sharedBuffer.put("branching", eventIds[3], start, DeweyNumber.fromString("1.1"));
-		NodeId b00 = sharedBuffer.put("branching", eventIds[3], b0, DeweyNumber.fromString("1.0.0"));
-		sharedBuffer.put("branching", eventIds[4], b00, DeweyNumber.fromString("1.0.0.0"));
-		NodeId b10 = sharedBuffer.put("branching", eventIds[4], b1, DeweyNumber.fromString("1.1.0"));
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+			NodeId start = sharedBufferAccessor.put("start", eventIds[1], null, DeweyNumber.fromString("1"));
+			NodeId b0 = sharedBufferAccessor.put("branching", eventIds[2], start, DeweyNumber.fromString("1.0"));
+			NodeId b1 = sharedBufferAccessor.put("branching", eventIds[3], start, DeweyNumber.fromString("1.1"));
+			NodeId b00 = sharedBufferAccessor.put("branching", eventIds[3], b0, DeweyNumber.fromString("1.0.0"));
+			sharedBufferAccessor.put("branching", eventIds[4], b00, DeweyNumber.fromString("1.0.0.0"));
+			NodeId b10 = sharedBufferAccessor.put("branching", eventIds[4], b1, DeweyNumber.fromString("1.1.0"));
 
-		//simulate IGNORE (next event can point to events[2])
-		sharedBuffer.lockNode(b0);
+			//simulate IGNORE (next event can point to events[2])
+			sharedBufferAccessor.lockNode(b0);
 
-		sharedBuffer.releaseNode(b10);
+			sharedBufferAccessor.releaseNode(b10);
 
-		for (EventId eventId : eventIds) {
-			sharedBuffer.releaseEvent(eventId);
+			for (EventId eventId : eventIds) {
+				sharedBufferAccessor.releaseEvent(eventId);
+			}
 		}
 
 		//There should be still events[1] and events[2] in the buffer
@@ -193,28 +197,30 @@ public void testSharedBufferExtractOrder() throws Exception {
 		expectedResult.put("c", new ArrayList<>());
 		expectedResult.get("c").add(events[4]);
 
-		NodeId a = sharedBuffer.put("a", eventIds[0], null, DeweyNumber.fromString("1"));
-		NodeId b = sharedBuffer.put("b", eventIds[1], a, DeweyNumber.fromString("1.0"));
-		NodeId aa = sharedBuffer.put("aa", eventIds[2], b, DeweyNumber.fromString("1.0.0"));
-		NodeId bb = sharedBuffer.put("bb", eventIds[3], aa, DeweyNumber.fromString("1.0.0.0"));
-		NodeId c = sharedBuffer.put("c", eventIds[4], bb, DeweyNumber.fromString("1.0.0.0.0"));
-
-		Map<String, List<Event>> patternsResult = sharedBuffer.materializeMatch(sharedBuffer.extractPatterns(c,
-			DeweyNumber.fromString("1.0.0.0.0")).get(0));
-
-		List<String> expectedOrder = new ArrayList<>();
-		expectedOrder.add("a");
-		expectedOrder.add("b");
-		expectedOrder.add("aa");
-		expectedOrder.add("bb");
-		expectedOrder.add("c");
-
-		for (EventId eventId : eventIds) {
-			sharedBuffer.releaseEvent(eventId);
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+			NodeId a = sharedBufferAccessor.put("a", eventIds[0], null, DeweyNumber.fromString("1"));
+			NodeId b = sharedBufferAccessor.put("b", eventIds[1], a, DeweyNumber.fromString("1.0"));
+			NodeId aa = sharedBufferAccessor.put("aa", eventIds[2], b, DeweyNumber.fromString("1.0.0"));
+			NodeId bb = sharedBufferAccessor.put("bb", eventIds[3], aa, DeweyNumber.fromString("1.0.0.0"));
+			NodeId c = sharedBufferAccessor.put("c", eventIds[4], bb, DeweyNumber.fromString("1.0.0.0.0"));
+
+			Map<String, List<Event>> patternsResult = sharedBufferAccessor.materializeMatch(sharedBufferAccessor.extractPatterns(c,
+				DeweyNumber.fromString("1.0.0.0.0")).get(0));
+
+			List<String> expectedOrder = new ArrayList<>();
+			expectedOrder.add("a");
+			expectedOrder.add("b");
+			expectedOrder.add("aa");
+			expectedOrder.add("bb");
+			expectedOrder.add("c");
+
+			for (EventId eventId : eventIds) {
+				sharedBufferAccessor.releaseEvent(eventId);
+			}
+			List<String> resultOrder = new ArrayList<>(patternsResult.keySet());
+			assertEquals(expectedOrder, resultOrder);
 		}
 
-		List<String> resultOrder = new ArrayList<>(patternsResult.keySet());
-		assertEquals(expectedOrder, resultOrder);
 	}
 
 	@Test
@@ -237,4 +243,41 @@ public void testSharedBufferCountersClearing() throws Exception {
 		assertFalse(counters.hasNext());
 	}
 
+	@Test
+	public void testSharedBufferAccessor() throws Exception {
+		TestSharedBuffer<Event> sharedBuffer = TestSharedBuffer.createTestBuffer(Event.createTypeSerializer());
+		int numberEvents = 8;
+		Event[] events = new Event[numberEvents];
+		EventId[] eventIds = new EventId[numberEvents];
+		final long timestamp = 1L;
+
+		try (SharedBufferAccessor<Event> sharedBufferAccessor = sharedBuffer.getAccessor()) {
+			for (int i = 0; i < numberEvents; i++) {
+				events[i] = new Event(i + 1, "e" + (i + 1), i);
+				eventIds[i] = sharedBufferAccessor.registerEvent(events[i], timestamp);
+			}
+			assertEquals(8, sharedBuffer.getEventsBufferCacheSize());
+			assertEquals(0, sharedBuffer.getSharedBufferNodeCacheSize());
+
+			NodeId start = sharedBufferAccessor.put("start", eventIds[1], null, DeweyNumber.fromString("1"));
+			NodeId b0 = sharedBufferAccessor.put("branching", eventIds[2], start, DeweyNumber.fromString("1.0"));
+			NodeId b1 = sharedBufferAccessor.put("branching", eventIds[3], start, DeweyNumber.fromString("1.1"));
+			NodeId b00 = sharedBufferAccessor.put("branching", eventIds[3], b0, DeweyNumber.fromString("1.0.0"));
+			sharedBufferAccessor.put("branching", eventIds[4], b00, DeweyNumber.fromString("1.0.0.0"));
+
+			assertEquals(4, sharedBuffer.getSharedBufferNodeCacheSize());
+			assertEquals(0, sharedBuffer.getSharedBufferNodeSize());
+
+			sharedBufferAccessor.lockNode(b0);
+
+			for (EventId eventId : eventIds) {
+				sharedBufferAccessor.releaseEvent(eventId);
+			}
+		}
+		assertEquals(0, sharedBuffer.getEventsBufferCacheSize());
+		assertEquals(4, sharedBuffer.getEventsBufferSize());
+		assertEquals(0, sharedBuffer.getSharedBufferNodeCacheSize());
+		assertEquals(4, sharedBuffer.getSharedBufferNodeSize());
+	}
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services