You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:24 UTC

[07/10] flink git commit: [FLINK-9418] Migrate SharedBuffer to use MapState

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index a00a310..7a43537 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -1,8 +1,8 @@
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOVICE file
+ * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
- * regarding copyright ownership.  Vhe ASF licenses this file
+ * regarding copyright ownership.  The ASF licenses this file
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
@@ -26,635 +26,54 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.cep.nfa.sharedbuffer.Lockable;
+import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
 import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
-import java.util.Stack;
+import java.util.stream.Collectors;
 
 /**
- * A shared buffer implementation which stores values under a key. Additionally, the values can be
- * versioned such that it is possible to retrieve their predecessor element in the buffer.
- *
- * <p>The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each
- * buffer page maintains a collection of the inserted values.
- *
- * <p>The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store
- * relations between different entries. A dewey versioning scheme allows to discriminate between
- * different relations (e.g. preceding element).
- *
- * <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
- *
- * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
- *     https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
- *
- * @param <K> Type of the keys
- * @param <V> Type of the values
+ * @deprecated everything in this class is deprecated. Those are only migration procedures from older versions.
  */
-public class SharedBuffer<K, V> {
-
-	private Map<K, SharedBufferPage<K, V>> pages;
-
-	public SharedBuffer() {
-		this.pages = new HashMap<>(4);
-	}
-
-	/**
-	 * Stores given value (value + timestamp) under the given key. It assigns a preceding element
-	 * relation to the entry which is defined by the previous key, value (value + timestamp).
-	 *
-	 * @param key               Key of the current value
-	 * @param value             Current value
-	 * @param timestamp         Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
-	 * @param previousKey       Key of the value for the previous relation
-	 * @param previousValue     Value for the previous relation
-	 * @param previousTimestamp Timestamp of the value for the previous relation
-	 * @param version           Version of the previous relation
-	 */
-	public int put(
-			final K key,
-			final V value,
-			final long timestamp,
-			final K previousKey,
-			final V previousValue,
-			final long previousTimestamp,
-			final int previousCounter,
-			final DeweyNumber version) {
-
-		final SharedBufferEntry<K, V> previousSharedBufferEntry =
-				get(previousKey, previousValue, previousTimestamp, previousCounter);
-
-		// sanity check whether we've found the previous element
-		if (previousSharedBufferEntry == null && previousValue != null) {
-			throw new IllegalStateException("Could not find previous entry with " +
-				"key: " + previousKey + ", value: " + previousValue + " and timestamp: " +
-				previousTimestamp + ". This can indicate that either you did not implement " +
-				"the equals() and hashCode() methods of your input elements properly or that " +
-				"the element belonging to that entry has been already pruned.");
-		}
-
-		return put(key, value, timestamp, previousSharedBufferEntry, version);
-	}
-
-	/**
-	 * Stores given value (value + timestamp) under the given key. It assigns no preceding element
-	 * relation to the entry.
-	 *
-	 * @param key       Key of the current value
-	 * @param value     Current value
-	 * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
-	 * @param version   Version of the previous relation
-	 */
-	public int put(
-			final K key,
-			final V value,
-			final long timestamp,
-			final DeweyNumber version) {
+@Deprecated
+public class SharedBuffer<V> {
 
-		return put(key, value, timestamp, null, version);
-	}
+	private final Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext;
+	private final Map<EventId, Lockable<V>> eventsBuffer;
+	private final Map<NodeId, Lockable<SharedBufferNode>> pages;
 
-	private int put(
-			final K key,
-			final V value,
-			final long timestamp,
-			final SharedBufferEntry<K, V> previousSharedBufferEntry,
-			final DeweyNumber version) {
-
-		SharedBufferPage<K, V> page = pages.get(key);
-		if (page == null) {
-			page = new SharedBufferPage<>(key);
-			pages.put(key, page);
-		}
-
-		// this assumes that elements are processed in order (in terms of time)
-		int counter = 0;
-		if (previousSharedBufferEntry != null) {
-			ValueTimeWrapper<V> prev = previousSharedBufferEntry.getValueTime();
-			if (prev != null && prev.getTimestamp() == timestamp) {
-				counter = prev.getCounter() + 1;
-			}
-		}
-		page.add(new ValueTimeWrapper<>(value, timestamp, counter), previousSharedBufferEntry, version);
-		return counter;
+	public Map<EventId, Lockable<V>> getEventsBuffer() {
+		return eventsBuffer;
 	}
 
-	public boolean isEmpty() {
-		for (SharedBufferPage<K, V> page: pages.values()) {
-			if (!page.isEmpty()) {
-				return false;
-			}
-		}
-		return true;
+	public Map<NodeId, Lockable<SharedBufferNode>> getPages() {
+		return pages;
 	}
 
-	/**
-	 * Deletes all entries in each page which have expired with respect to given pruning timestamp.
-	 *
-	 * @param pruningTimestamp The time which is used for pruning. All elements whose timestamp is
-	 *                         lower than the pruning timestamp will be removed.
-	 * @return {@code true} if pruning happened
-	 */
-	public boolean prune(long pruningTimestamp) {
-		final Set<SharedBufferEntry<K, V>> prunedEntries = new HashSet<>();
-
-		final Iterator<Map.Entry<K, SharedBufferPage<K, V>>> it = pages.entrySet().iterator();
-		while (it.hasNext()) {
-			SharedBufferPage<K, V> page = it.next().getValue();
-
-			page.prune(pruningTimestamp, prunedEntries);
-			if (page.isEmpty()) {
-				it.remove();
-			}
-		}
-
-		if (prunedEntries.isEmpty()) {
-			return false;
-		}
+	public SharedBuffer(
+			Map<EventId, Lockable<V>> eventsBuffer,
+			Map<NodeId, Lockable<SharedBufferNode>> pages,
+			Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext) {
 
-		for (SharedBufferPage<K, V> entry : pages.values()) {
-			entry.removeEdges(prunedEntries);
-		}
-		return true;
-	}
-
-	/**
-	 * Returns all elements from the previous relation starting at the given value with the
-	 * given key and timestamp.
-	 *
-	 * @param key Key of the starting value
-	 * @param value Value of the starting element
-	 * @param timestamp Timestamp of the starting value
-	 * @param version Version of the previous relation which shall be extracted
-	 * @return Collection of previous relations starting with the given value
-	 */
-	public List<Map<K, List<V>>> extractPatterns(
-			final K key,
-			final V value,
-			final long timestamp,
-			final int counter,
-			final DeweyNumber version) {
-
-		List<Map<K, List<V>>> result = new ArrayList<>();
-
-		// stack to remember the current extraction states
-		Stack<ExtractionState<K, V>> extractionStates = new Stack<>();
-
-		// get the starting shared buffer entry for the previous relation
-		SharedBufferEntry<K, V> entry = get(key, value, timestamp, counter);
-
-		if (entry != null) {
-			extractionStates.add(new ExtractionState<>(entry, version, new Stack<>()));
-
-			// use a depth first search to reconstruct the previous relations
-			while (!extractionStates.isEmpty()) {
-				final ExtractionState<K, V> extractionState = extractionStates.pop();
-				// current path of the depth first search
-				final Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
-				final SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
-
-				// termination criterion
-				if (currentEntry == null) {
-					final Map<K, List<V>> completePath = new LinkedHashMap<>();
-
-					while (!currentPath.isEmpty()) {
-						final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
-
-						K k = currentPathEntry.getKey();
-						List<V> values = completePath.get(k);
-						if (values == null) {
-							values = new ArrayList<>();
-							completePath.put(k, values);
-						}
-						values.add(currentPathEntry.getValueTime().getValue());
-					}
-					result.add(completePath);
-				} else {
-
-					// append state to the path
-					currentPath.push(currentEntry);
-
-					boolean firstMatch = true;
-					for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
-						// we can only proceed if the current version is compatible to the version
-						// of this previous relation
-						final DeweyNumber currentVersion = extractionState.getVersion();
-						if (currentVersion.isCompatibleWith(edge.getVersion())) {
-							if (firstMatch) {
-								// for the first match we don't have to copy the current path
-								extractionStates.push(new ExtractionState<>(edge.getTarget(), edge.getVersion(), currentPath));
-								firstMatch = false;
-							} else {
-								final Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
-								copy.addAll(currentPath);
-
-								extractionStates.push(
-									new ExtractionState<>(
-										edge.getTarget(),
-										edge.getVersion(),
-										copy));
-							}
-						}
-					}
-				}
-
-			}
-		}
-		return result;
-	}
-
-	/**
-	 * Increases the reference counter for the given value, key, timestamp entry so that it is not
-	 * accidentally removed.
-	 *
-	 * @param key       Key of the value to lock
-	 * @param value     Value to lock
-	 * @param timestamp Timestamp of the value to lock
-	 */
-	public void lock(final K key, final V value, final long timestamp, int counter) {
-		SharedBufferEntry<K, V> entry = get(key, value, timestamp, counter);
-		if (entry != null) {
-			entry.increaseReferenceCounter();
-		}
-	}
-
-	/**
-	 * Decreases the reference counter for the given value, key, timestamp entry so that it can be
-	 * removed once the reference counter reaches 0.
-	 *
-	 * @param key       Key of the value to release
-	 * @param value     Value to release
-	 * @param timestamp Timestamp of the value to release
-	 */
-	public void release(final K key, final V value, final long timestamp, int counter) {
-		SharedBufferEntry<K, V> entry = get(key, value, timestamp, counter);
-		if (entry != null) {
-			internalRemove(entry);
-		}
-	}
-
-	private SharedBuffer(Map<K, SharedBufferPage<K, V>> pages) {
+		this.eventsBuffer = eventsBuffer;
 		this.pages = pages;
+		this.mappingContext = mappingContext;
 	}
 
-	private SharedBufferEntry<K, V> get(
-			final K key,
-			final V value,
-			final long timestamp,
-			final int counter) {
-		SharedBufferPage<K, V> page = pages.get(key);
-		return page == null ? null : page.get(new ValueTimeWrapper<>(value, timestamp, counter));
-	}
-
-	private void internalRemove(final SharedBufferEntry<K, V> entry) {
-		Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
-		entriesToRemove.add(entry);
-
-		while (!entriesToRemove.isEmpty()) {
-			SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();
-			currentEntry.decreaseReferenceCounter();
-
-			if (currentEntry.getReferenceCounter() == 0) {
-				currentEntry.remove();
-
-				for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
-					if (edge.getTarget() != null) {
-						entriesToRemove.push(edge.getTarget());
-					}
-				}
-			}
-		}
-	}
-
-	@Override
-	public String toString() {
-		StringBuilder builder = new StringBuilder();
-
-		for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
-			builder.append("Key: ").append(entry.getKey()).append(System.lineSeparator());
-			builder.append("Value: ").append(entry.getValue()).append(System.lineSeparator());
-		}
-
-		return builder.toString();
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (obj instanceof SharedBuffer) {
-			@SuppressWarnings("unchecked")
-			SharedBuffer<K, V> other = (SharedBuffer<K, V>) obj;
-
-			return pages.equals(other.pages);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public int hashCode() {
-		return Objects.hash(pages);
-	}
-
-	/**
-	 * The SharedBufferPage represents a set of elements which have been stored under the same key.
-	 *
-	 * @param <K> Type of the key
-	 * @param <V> Type of the value
-	 */
-	private static class SharedBufferPage<K, V> {
-
-		private final K key;
-		private final Map<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entries;
-
-		SharedBufferPage(final K key) {
-			this.key = key;
-			entries = new HashMap<>();
-		}
-
-		public K getKey() {
-			return key;
-		}
-
-		/**
-		 * Adds a new value time pair to the page. The new entry is linked to the previous entry
-		 * with the given version.
-		 *
-		 * @param valueTime Value time pair to be stored
-		 * @param previous Previous shared buffer entry to which the new entry shall be linked
-		 * @param version Version of the relation between the new and the previous entry
-		 */
-		public void add(final ValueTimeWrapper<V> valueTime, final SharedBufferEntry<K, V> previous, final DeweyNumber version) {
-			SharedBufferEntry<K, V> sharedBufferEntry = entries.get(valueTime);
-			if (sharedBufferEntry == null) {
-				sharedBufferEntry = new SharedBufferEntry<>(valueTime, this);
-				entries.put(valueTime, sharedBufferEntry);
-			}
-
-			SharedBufferEdge<K, V> newEdge;
-			if (previous != null) {
-				newEdge = new SharedBufferEdge<>(previous, version);
-				previous.increaseReferenceCounter();
-			} else {
-				newEdge = new SharedBufferEdge<>(null, version);
-			}
-			sharedBufferEntry.addEdge(newEdge);
-		}
-
-		public SharedBufferEntry<K, V> get(final ValueTimeWrapper<V> valueTime) {
-			return entries.get(valueTime);
-		}
-
-		/**
-		 * Removes all entries from the map whose timestamp is smaller than the pruning timestamp.
-		 * @param pruningTimestamp Timestamp for the pruning
-		 * @param prunedEntries a {@link Set} to put the removed {@link SharedBufferEntry SharedBufferEntries}.
-		 */
-		private void prune(final long pruningTimestamp, final Set<SharedBufferEntry<K, V>> prunedEntries) {
-			Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> it = entries.entrySet().iterator();
-			while (it.hasNext()) {
-				SharedBufferEntry<K, V> entry = it.next().getValue();
-				if (entry.getValueTime().getTimestamp() <= pruningTimestamp) {
-					prunedEntries.add(entry);
-					it.remove();
-				}
-			}
-		}
-
-		/**
-		 * Remove edges with the specified targets for the entries.
-		 */
-		private void removeEdges(final Set<SharedBufferEntry<K, V>> prunedEntries) {
-			for (SharedBufferEntry<K, V> entry : entries.values()) {
-				entry.removeEdges(prunedEntries);
-			}
-		}
-
-		public SharedBufferEntry<K, V> remove(final ValueTimeWrapper<V> valueTime) {
-			return entries.remove(valueTime);
-		}
-
-		public boolean isEmpty() {
-			return entries.isEmpty();
-		}
-
-		@Override
-		public String toString() {
-			StringBuilder builder = new StringBuilder();
-			builder.append("SharedBufferPage(" + System.lineSeparator());
-			for (SharedBufferEntry<K, V> entry: entries.values()) {
-				builder.append(entry).append(System.lineSeparator());
-			}
-			builder.append(")");
-			return builder.toString();
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (!(obj instanceof SharedBufferPage)) {
-				return false;
-			}
-			SharedBufferPage<K, V> other = (SharedBufferPage<K, V>) obj;
-			return key.equals(other.getKey()) && entries.equals(other.entries);
-		}
-
-		@Override
-		public int hashCode() {
-			int result = 1;
-			result += 31 * result + key.hashCode();
-			result += 31 * result + entries.hashCode();
-			return result;
-		}
-	}
-
-	/**
-	 * Entry of a {@link SharedBufferPage}. The entry contains the value timestamp pair, a set of
-	 * edges to other shared buffer entries denoting a relation, a reference to the owning page and
-	 * a reference counter. The reference counter counts how many references are kept to this entry.
-	 *
-	 * @param <K> Type of the key
-	 * @param <V> Type of the value
-	 */
-	private static class SharedBufferEntry<K, V> {
-
-		private final ValueTimeWrapper<V> valueTime;
-		private final Set<SharedBufferEdge<K, V>> edges;
-		private final SharedBufferPage<K, V> page;
-
-		private int referenceCounter;
-		private int entryId;
-
-		SharedBufferEntry(
-				final ValueTimeWrapper<V> valueTime,
-				final SharedBufferPage<K, V> page) {
-			this(valueTime, null, page);
-		}
-
-		SharedBufferEntry(
-				final ValueTimeWrapper<V> valueTime,
-				final SharedBufferEdge<K, V> edge,
-				final SharedBufferPage<K, V> page) {
-
-			this.valueTime = valueTime;
-			edges = new HashSet<>();
-			if (edge != null) {
-				edges.add(edge);
-			}
-			referenceCounter = 0;
-			entryId = -1;
-			this.page = page;
-		}
-
-		public ValueTimeWrapper<V> getValueTime() {
-			return valueTime;
-		}
-
-		public Set<SharedBufferEdge<K, V>> getEdges() {
-			return edges;
-		}
-
-		public K getKey() {
-			return page.getKey();
-		}
-
-		public void addEdge(SharedBufferEdge<K, V> edge) {
-			edges.add(edge);
-		}
-
-		/**
-		 * Remove edges with the specified targets.
-		 */
-		private void removeEdges(final Set<SharedBufferEntry<K, V>> prunedEntries) {
-			Iterator<SharedBufferEdge<K, V>> it = edges.iterator();
-			while (it.hasNext()) {
-				SharedBufferEdge<K, V> edge = it.next();
-				if (prunedEntries.contains(edge.getTarget())) {
-					it.remove();
-				}
-			}
-		}
-
-		public void remove() {
-			page.remove(valueTime);
-		}
-
-		public void increaseReferenceCounter() {
-			referenceCounter++;
-		}
-
-		public void decreaseReferenceCounter() {
-			if (referenceCounter > 0) {
-				referenceCounter--;
-			}
-		}
-
-		public int getReferenceCounter() {
-			return referenceCounter;
-		}
-
-		@Override
-		public String toString() {
-			return "SharedBufferEntry(" + valueTime + ", [" + StringUtils.join(edges, ", ") + "], " + referenceCounter + ")";
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (!(obj instanceof SharedBufferEntry)) {
-				return false;
-			}
-
-			@SuppressWarnings("unchecked")
-			SharedBufferEntry<K, V> other = (SharedBufferEntry<K, V>) obj;
-
-			return valueTime.equals(other.valueTime) &&
-					getKey().equals(other.getKey()) &&
-					referenceCounter == other.referenceCounter &&
-					Objects.equals(edges, other.edges);
-		}
-
-		@Override
-		public int hashCode() {
-			int result = 1;
-			result += 31 * result + valueTime.hashCode();
-			result += 31 * result + getKey().hashCode();
-			result += 31 * result + referenceCounter;
-			result += 31 * result + edges.hashCode();
-			return result;
-		}
-	}
-
-	/**
-	 * Versioned edge between two shared buffer entries.
-	 *
-	 * @param <K> Type of the key
-	 * @param <V> Type of the value
-	 */
-	private static class SharedBufferEdge<K, V> {
-		private final SharedBufferEntry<K, V> target;
-		private final DeweyNumber version;
-
-		SharedBufferEdge(final SharedBufferEntry<K, V> target, final DeweyNumber version) {
-			this.target = target;
-			this.version = version;
-		}
-
-		public SharedBufferEntry<K, V> getTarget() {
-			return target;
-		}
-
-		public DeweyNumber getVersion() {
-			return version;
-		}
-
-		@Override
-		public String toString() {
-			return "SharedBufferEdge(" + target + ", " + version + ")";
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (!(obj instanceof SharedBufferEdge)) {
-				return false;
-			}
-
-			@SuppressWarnings("unchecked")
-			SharedBufferEdge<K, V> other = (SharedBufferEdge<K, V>) obj;
-			if (!version.equals(other.getVersion())) {
-				return false;
-			}
-
-			if (target == null && other.getTarget() == null) {
-				return true;
-			} else if (target != null && other.getTarget() != null) {
-				return target.getKey().equals(other.getTarget().getKey()) &&
-						target.getValueTime().equals(other.getTarget().getValueTime());
-			} else {
-				return false;
-			}
-		}
-
-		@Override
-		public int hashCode() {
-			if (target != null) {
-				return Objects.hash(target.getKey(), target.getValueTime(), version);
-			} else {
-				return version.hashCode();
-			}
-		}
+	public NodeId getNodeId(String prevState, long timestamp, int counter, V event) {
+		return mappingContext.get(Tuple2.of(NFAStateNameHandler.getOriginalNameFromInternal(prevState),
+			new ValueTimeWrapper<>(event, timestamp, counter)));
 	}
 
 	/**
@@ -706,8 +125,8 @@ public class SharedBuffer<K, V> {
 			ValueTimeWrapper<V> other = (ValueTimeWrapper<V>) obj;
 
 			return timestamp == other.getTimestamp()
-					&& Objects.equals(value, other.getValue())
-					&& counter == other.getCounter();
+				&& Objects.equals(value, other.getValue())
+				&& counter == other.getCounter();
 		}
 
 		@Override
@@ -715,17 +134,9 @@ public class SharedBuffer<K, V> {
 			return (int) (31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter);
 		}
 
-		public void serialize(
-				final TypeSerializer<V> valueSerializer,
-				final DataOutputView target) throws IOException {
-			valueSerializer.serialize(value, target);
-			target.writeLong(timestamp);
-			target.writeInt(counter);
-		}
-
 		public static <V> ValueTimeWrapper<V> deserialize(
-				final TypeSerializer<V> valueSerializer,
-				final DataInputView source) throws IOException {
+			final TypeSerializer<V> valueSerializer,
+			final DataInputView source) throws IOException {
 
 			final V value = valueSerializer.deserialize(source);
 			final long timestamp = source.readLong();
@@ -736,48 +147,6 @@ public class SharedBuffer<K, V> {
 	}
 
 	/**
-	 * Helper class to store the extraction state while extracting a sequence of values following
-	 * the versioned entry edges.
-	 *
-	 * @param <K> Type of the key
-	 * @param <V> Type of the value
-	 */
-	private static class ExtractionState<K, V> {
-
-		private final SharedBufferEntry<K, V> entry;
-		private final DeweyNumber version;
-		private final Stack<SharedBufferEntry<K, V>> path;
-
-		ExtractionState(
-				final SharedBufferEntry<K, V> entry,
-				final DeweyNumber version,
-				final Stack<SharedBufferEntry<K, V>> path) {
-			this.entry = entry;
-			this.version = version;
-			this.path = path;
-		}
-
-		public SharedBufferEntry<K, V> getEntry() {
-			return entry;
-		}
-
-		public DeweyNumber getVersion() {
-			return version;
-		}
-
-		public Stack<SharedBufferEntry<K, V>> getPath() {
-			return path;
-		}
-
-		@Override
-		public String toString() {
-			return "ExtractionState(" + entry + ", " + version + ", [" +  StringUtils.join(path, ", ") + "])";
-		}
-	}
-
-	//////////////				New Serialization				////////////////////
-
-	/**
 	 * The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
 	 */
 	public static final class SharedBufferSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot {
@@ -785,12 +154,13 @@ public class SharedBuffer<K, V> {
 		private static final int VERSION = 1;
 
 		/** This empty constructor is required for deserializing the configuration. */
-		public SharedBufferSerializerConfigSnapshot() {}
+		public SharedBufferSerializerConfigSnapshot() {
+		}
 
 		public SharedBufferSerializerConfigSnapshot(
-				final TypeSerializer<K> keySerializer,
-				final TypeSerializer<V> valueSerializer,
-				final TypeSerializer<DeweyNumber> versionSerializer) {
+			final TypeSerializer<K> keySerializer,
+			final TypeSerializer<V> valueSerializer,
+			final TypeSerializer<DeweyNumber> versionSerializer) {
 
 			super(keySerializer, valueSerializer, versionSerializer);
 		}
@@ -804,7 +174,7 @@ public class SharedBuffer<K, V> {
 	/**
 	 * A {@link TypeSerializer} for the {@link SharedBuffer}.
 	 */
-	public static class SharedBufferSerializer<K, V> extends TypeSerializer<SharedBuffer<K, V>> {
+	public static class SharedBufferSerializer<K, V> extends TypeSerializer<SharedBuffer<V>> {
 
 		private static final long serialVersionUID = -3254176794680331560L;
 
@@ -815,13 +185,13 @@ public class SharedBuffer<K, V> {
 		public SharedBufferSerializer(
 				final TypeSerializer<K> keySerializer,
 				final TypeSerializer<V> valueSerializer) {
-			this(keySerializer, valueSerializer, new DeweyNumber.DeweyNumberSerializer());
+			this(keySerializer, valueSerializer, DeweyNumber.DeweyNumberSerializer.INSTANCE);
 		}
 
 		public SharedBufferSerializer(
-				final TypeSerializer<K> keySerializer,
-				final TypeSerializer<V> valueSerializer,
-				final TypeSerializer<DeweyNumber> versionSerializer) {
+			final TypeSerializer<K> keySerializer,
+			final TypeSerializer<V> valueSerializer,
+			final TypeSerializer<DeweyNumber> versionSerializer) {
 
 			this.keySerializer = keySerializer;
 			this.valueSerializer = valueSerializer;
@@ -851,32 +221,18 @@ public class SharedBuffer<K, V> {
 		}
 
 		@Override
-		public SharedBuffer<K, V> createInstance() {
-			return new SharedBuffer<>();
+		public SharedBuffer<V> createInstance() {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public SharedBuffer<K, V> copy(SharedBuffer<K, V> from) {
-			try {
-				ByteArrayOutputStream baos = new ByteArrayOutputStream();
-				serialize(from, new DataOutputViewStreamWrapper(baos));
-				baos.close();
-
-				byte[] data = baos.toByteArray();
-
-				ByteArrayInputStream bais = new ByteArrayInputStream(data);
-				SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(bais));
-				bais.close();
-
-				return copy;
-			} catch (IOException e) {
-				throw new RuntimeException("Could not copy SharredBuffer.", e);
-			}
+		public SharedBuffer<V> copy(SharedBuffer<V> from) {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public SharedBuffer<K, V> copy(SharedBuffer<K, V> from, SharedBuffer<K, V> reuse) {
-			return copy(from);
+		public SharedBuffer<V> copy(SharedBuffer<V> from, SharedBuffer<V> reuse) {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
@@ -885,83 +241,43 @@ public class SharedBuffer<K, V> {
 		}
 
 		@Override
-		public void serialize(SharedBuffer<K, V> record, DataOutputView target) throws IOException {
-			Map<K, SharedBufferPage<K, V>> pages = record.pages;
-
-			int totalEdges = 0;
-			int entryCounter = 0;
-
-			// number of pages
-			target.writeInt(pages.size());
-
-			for (SharedBufferPage<K, V> page: pages.values()) {
-
-				// key for the current page
-				keySerializer.serialize(page.getKey(), target);
-
-				target.writeInt(page.entries.size());
-				for (SharedBufferEntry<K, V> sharedBuffer: page.entries.values()) {
-
-					// assign id to the sharedBufferEntry for the future
-					// serialization of the previous relation
-					sharedBuffer.entryId = entryCounter++;
-
-					ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
-					valueTimeWrapper.serialize(valueSerializer, target);
-					target.writeInt(sharedBuffer.getReferenceCounter());
-
-					totalEdges += sharedBuffer.getEdges().size();
-				}
-			}
-
-			// write the edges between the shared buffer entries
-			target.writeInt(totalEdges);
-
-			for (SharedBufferPage<K, V> page: pages.values()) {
-				for (SharedBufferEntry<K, V> sharedBuffer: page.entries.values()) {
-
-					// in order to serialize the previous relation we simply serialize
-					// the ids of the source and target SharedBufferEntry
-
-					int sourceId = sharedBuffer.entryId;
-
-					for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
-						int targetId = -1;
-						if (edge.getTarget() != null) {
-							targetId = edge.getTarget().entryId;
-						}
-
-						target.writeInt(sourceId);
-						target.writeInt(targetId);
-						versionSerializer.serialize(edge.getVersion(), target);
-					}
-				}
-			}
+		public void serialize(SharedBuffer<V> record, DataOutputView target) throws IOException {
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
-		public SharedBuffer<K, V> deserialize(DataInputView source) throws IOException {
-			List<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
-			Map<K, SharedBufferPage<K, V>> pages = new HashMap<>();
-
+		public SharedBuffer<V> deserialize(DataInputView source) throws IOException {
+			List<Tuple2<NodeId, Lockable<SharedBufferNode>>> entries = new ArrayList<>();
+			Map<ValueTimeWrapper<V>, EventId> values = new HashMap<>();
+			Map<EventId, Lockable<V>> valuesWithIds = new HashMap<>();
+			Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext = new HashMap<>();
+			Map<Long, Long> totalEventsPerTimestamp = new HashMap<>();
 			int totalPages = source.readInt();
 
 			for (int i = 0; i < totalPages; i++) {
-
 				// key of the page
-				K key = keySerializer.deserialize(source);
-				SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
-				pages.put(key, page);
+				K stateName = keySerializer.deserialize(source);
 
 				int numberEntries = source.readInt();
 				for (int j = 0; j < numberEntries; j++) {
-					ValueTimeWrapper<V> valueTimeWrapper = ValueTimeWrapper.deserialize(valueSerializer, source);
-					SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<>(valueTimeWrapper, page);
-					sharedBufferEntry.referenceCounter = source.readInt();
+					ValueTimeWrapper<V> wrapper = ValueTimeWrapper.deserialize(valueSerializer, source);
+					EventId eventId = values.get(wrapper);
+					if (eventId == null) {
+						long id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0L);
+						eventId = new EventId(id, wrapper.timestamp);
+						values.put(wrapper, eventId);
+						valuesWithIds.put(eventId, new Lockable<>(wrapper.value, 1));
+						totalEventsPerTimestamp.computeIfPresent(wrapper.timestamp, (k, v) -> v + 1);
+					} else {
+						Lockable<V> eventWrapper = valuesWithIds.get(eventId);
+						eventWrapper.lock();
+					}
 
-					page.entries.put(valueTimeWrapper, sharedBufferEntry);
+					NodeId nodeId = new NodeId(eventId, (String) stateName);
+					int refCount = source.readInt();
 
-					entryList.add(sharedBufferEntry);
+					entries.add(Tuple2.of(nodeId, new Lockable<>(new SharedBufferNode(), refCount)));
+					mappingContext.put(Tuple2.of((String) stateName, wrapper), nodeId);
 				}
 			}
 
@@ -977,61 +293,25 @@ public class SharedBuffer<K, V> {
 
 				// We've already deserialized the shared buffer entry. Simply read its ID and
 				// retrieve the buffer entry from the list of entries
-				SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIdx);
-				SharedBufferEntry<K, V> targetEntry = targetIdx < 0 ? null : entryList.get(targetIdx);
-				sourceEntry.edges.add(new SharedBufferEdge<>(targetEntry, version));
+				Tuple2<NodeId, Lockable<SharedBufferNode>> sourceEntry = entries.get(sourceIdx);
+				Tuple2<NodeId, Lockable<SharedBufferNode>> targetEntry =
+					targetIdx < 0 ? Tuple2.of(null, null) : entries.get(targetIdx);
+				sourceEntry.f1.getElement().addEdge(new SharedBufferEdge(targetEntry.f0, version));
 			}
-			return new SharedBuffer<>(pages);
+
+			Map<NodeId, Lockable<SharedBufferNode>> entriesMap = entries.stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1));
+
+			return new SharedBuffer<>(valuesWithIds, entriesMap, mappingContext);
 		}
 
 		@Override
-		public SharedBuffer<K, V> deserialize(SharedBuffer<K, V> reuse, DataInputView source) throws IOException {
+		public SharedBuffer<V> deserialize(SharedBuffer<V> reuse, DataInputView source) throws IOException {
 			return deserialize(source);
 		}
 
 		@Override
 		public void copy(DataInputView source, DataOutputView target) throws IOException {
-			int numberPages = source.readInt();
-			target.writeInt(numberPages);
-
-			for (int i = 0; i < numberPages; i++) {
-				// key of the page
-				@SuppressWarnings("unchecked")
-				K key = keySerializer.deserialize(source);
-				keySerializer.serialize(key, target);
-
-				int numberEntries = source.readInt();
-
-				for (int j = 0; j < numberEntries; j++) {
-					// restore the SharedBufferEntries for the given page
-					V value = valueSerializer.deserialize(source);
-					valueSerializer.serialize(value, target);
-
-					long timestamp = source.readLong();
-					target.writeLong(timestamp);
-
-					int counter = source.readInt();
-					target.writeInt(counter);
-
-					int referenceCounter = source.readInt();
-					target.writeInt(referenceCounter);
-				}
-			}
-
-			// read the edges of the shared buffer entries
-			int numberEdges = source.readInt();
-			target.writeInt(numberEdges);
-
-			for (int j = 0; j < numberEdges; j++) {
-				int sourceIndex = source.readInt();
-				int targetIndex = source.readInt();
-
-				target.writeInt(sourceIndex);
-				target.writeInt(targetIndex);
-
-				DeweyNumber version = versionSerializer.deserialize(source);
-				versionSerializer.serialize(version, target);
-			}
+			throw new UnsupportedOperationException();
 		}
 
 		@Override
@@ -1046,7 +326,7 @@ public class SharedBuffer<K, V> {
 
 			SharedBufferSerializer other = (SharedBufferSerializer) obj;
 			return
-					Objects.equals(keySerializer, other.getKeySerializer()) &&
+				Objects.equals(keySerializer, other.getKeySerializer()) &&
 					Objects.equals(valueSerializer, other.getValueSerializer()) &&
 					Objects.equals(versionSerializer, other.getVersionSerializer());
 		}
@@ -1064,47 +344,48 @@ public class SharedBuffer<K, V> {
 		@Override
 		public TypeSerializerConfigSnapshot snapshotConfiguration() {
 			return new SharedBufferSerializerConfigSnapshot<>(
-					keySerializer,
-					valueSerializer,
-					versionSerializer);
+				keySerializer,
+				valueSerializer,
+				versionSerializer);
 		}
 
 		@Override
-		public CompatibilityResult<SharedBuffer<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+		public CompatibilityResult<SharedBuffer<V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
 			if (configSnapshot instanceof SharedBufferSerializerConfigSnapshot) {
 				List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializerConfigSnapshots =
-						((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+					((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
 
 				CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-						serializerConfigSnapshots.get(0).f0,
-						UnloadableDummyTypeSerializer.class,
-						serializerConfigSnapshots.get(0).f1,
-						keySerializer);
+					serializerConfigSnapshots.get(0).f0,
+					UnloadableDummyTypeSerializer.class,
+					serializerConfigSnapshots.get(0).f1,
+					keySerializer);
 
 				CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-						serializerConfigSnapshots.get(1).f0,
-						UnloadableDummyTypeSerializer.class,
-						serializerConfigSnapshots.get(1).f1,
-						valueSerializer);
+					serializerConfigSnapshots.get(1).f0,
+					UnloadableDummyTypeSerializer.class,
+					serializerConfigSnapshots.get(1).f1,
+					valueSerializer);
 
 				CompatibilityResult<DeweyNumber> versionCompatResult = CompatibilityUtil.resolveCompatibilityResult(
-						serializerConfigSnapshots.get(2).f0,
-						UnloadableDummyTypeSerializer.class,
-						serializerConfigSnapshots.get(2).f1,
-						versionSerializer);
+					serializerConfigSnapshots.get(2).f0,
+					UnloadableDummyTypeSerializer.class,
+					serializerConfigSnapshots.get(2).f1,
+					versionSerializer);
 
-				if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() && !versionCompatResult.isRequiresMigration()) {
+				if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() &&
+					!versionCompatResult.isRequiresMigration()) {
 					return CompatibilityResult.compatible();
 				} else {
 					if (keyCompatResult.getConvertDeserializer() != null
-							&& valueCompatResult.getConvertDeserializer() != null
-							&& versionCompatResult.getConvertDeserializer() != null) {
+						&& valueCompatResult.getConvertDeserializer() != null
+						&& versionCompatResult.getConvertDeserializer() != null) {
 						return CompatibilityResult.requiresMigration(
-								new SharedBufferSerializer<>(
-										new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
-										new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()),
-										new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer())
-								));
+							new SharedBufferSerializer<>(
+								new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
+								new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()),
+								new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer())
+							));
 					}
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 5b9522b..dbb654c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.NFA;
@@ -53,29 +52,10 @@ public class NFACompiler {
 	protected static final String ENDING_STATE_NAME = "$endState$";
 
 	/**
-	 * Compiles the given pattern into a {@link NFA}.
-	 *
-	 * @param pattern Definition of sequence pattern
-	 * @param inputTypeSerializer Serializer for the input type
-	 * @param timeoutHandling True if the NFA shall return timed out event patterns
-	 * @param <T> Type of the input events
-	 * @return Non-deterministic finite automaton representing the given pattern
-	 */
-	public static <T> NFA<T> compile(
-		Pattern<T, ?> pattern,
-		TypeSerializer<T> inputTypeSerializer,
-		boolean timeoutHandling) {
-		NFAFactory<T> factory = compileFactory(pattern, inputTypeSerializer, timeoutHandling);
-
-		return factory.createNFA();
-	}
-
-	/**
 	 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
 	 * multiple NFAs.
 	 *
 	 * @param pattern Definition of sequence pattern
-	 * @param inputTypeSerializer Serializer for the input type
 	 * @param timeoutHandling True if the NFA shall return timed out event patterns
 	 * @param <T> Type of the input events
 	 * @return Factory for NFAs corresponding to the given pattern
@@ -83,15 +63,14 @@ public class NFACompiler {
 	@SuppressWarnings("unchecked")
 	public static <T> NFAFactory<T> compileFactory(
 		final Pattern<T, ?> pattern,
-		final TypeSerializer<T> inputTypeSerializer,
 		boolean timeoutHandling) {
 		if (pattern == null) {
 			// return a factory for empty NFAs
-			return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
+			return new NFAFactoryImpl<>(0, Collections.<State<T>>emptyList(), timeoutHandling);
 		} else {
 			final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
 			nfaFactoryCompiler.compileFactory();
-			return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
+			return new NFAFactoryImpl<>(nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
 		}
 	}
 
@@ -900,18 +879,15 @@ public class NFACompiler {
 
 		private static final long serialVersionUID = 8939783698296714379L;
 
-		private final TypeSerializer<T> inputTypeSerializer;
 		private final long windowTime;
 		private final Collection<State<T>> states;
 		private final boolean timeoutHandling;
 
 		private NFAFactoryImpl(
-			TypeSerializer<T> inputTypeSerializer,
-			long windowTime,
-			Collection<State<T>> states,
-			boolean timeoutHandling) {
+				long windowTime,
+				Collection<State<T>> states,
+				boolean timeoutHandling) {
 
-			this.inputTypeSerializer = inputTypeSerializer;
 			this.windowTime = windowTime;
 			this.states = states;
 			this.timeoutHandling = timeoutHandling;
@@ -919,8 +895,7 @@ public class NFACompiler {
 
 		@Override
 		public NFA<T> createNFA() {
-			return new NFA<>(
-				inputTypeSerializer.duplicate(), windowTime, timeoutHandling, states);
+			return new NFA<>(states, windowTime, timeoutHandling);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
new file mode 100644
index 0000000..9b99ea1
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Composite key for events in {@link SharedBuffer}.
+ */
+public class EventId {
+	private final long id;
+	private final long timestamp;
+
+	public EventId(long id, long timestamp) {
+		this.id = id;
+		this.timestamp = timestamp;
+	}
+
+	public long getId() {
+		return id;
+	}
+
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		EventId eventId = (EventId) o;
+		return id == eventId.id &&
+			timestamp == eventId.timestamp;
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(id, timestamp);
+	}
+
+	@Override
+	public String toString() {
+		return "EventId{" +
+			"id=" + id +
+			", timestamp=" + timestamp +
+			'}';
+	}
+
+	/** {@link TypeSerializer} for {@link EventId}. */
+	public static class EventIdSerializer extends TypeSerializerSingleton<EventId> {
+
+		private static final long serialVersionUID = -5685733582601394497L;
+
+		private EventIdSerializer() {
+		}
+
+		public static final EventIdSerializer INSTANCE = new EventIdSerializer();
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public EventId createInstance() {
+			return null;
+		}
+
+		@Override
+		public EventId copy(EventId from) {
+			return new EventId(from.id, from.timestamp);
+		}
+
+		@Override
+		public EventId copy(EventId from, EventId reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return 2 * LongSerializer.INSTANCE.getLength();
+		}
+
+		@Override
+		public void serialize(EventId record, DataOutputView target) throws IOException {
+			LongSerializer.INSTANCE.serialize(record.id, target);
+			LongSerializer.INSTANCE.serialize(record.timestamp, target);
+		}
+
+		@Override
+		public EventId deserialize(DataInputView source) throws IOException {
+			Long id = LongSerializer.INSTANCE.deserialize(source);
+			Long timestamp = LongSerializer.INSTANCE.deserialize(source);
+
+			return new EventId(id, timestamp);
+		}
+
+		@Override
+		public EventId deserialize(EventId reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			LongSerializer.INSTANCE.copy(source, target);
+			LongSerializer.INSTANCE.copy(source, target);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj.getClass().equals(EventIdSerializer.class);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
new file mode 100644
index 0000000..ca1ecae
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Implements locking logic for incoming event and
+ * {@link SharedBufferNode} using a lock reference counter.
+ */
+public final class Lockable<T> {
+
+	private int refCounter;
+
+	private final T element;
+
+	public Lockable(T element, int refCounter) {
+		this.refCounter = refCounter;
+		this.element = element;
+	}
+
+	public void lock() {
+		refCounter += 1;
+	}
+
+	/**
+	 * Releases lock on this object. If no more locks are acquired on it, this method will return true.
+	 *
+	 * @return true if no more locks are acquired
+	 */
+	boolean release() {
+		if (refCounter <= 0) {
+			return true;
+		}
+
+		refCounter -= 1;
+		return refCounter == 0;
+	}
+
+	int getRefCounter() {
+		return refCounter;
+	}
+
+	public T getElement() {
+		return element;
+	}
+
+	@Override
+	public String toString() {
+		return "Lock{" +
+			"refCounter=" + refCounter +
+			'}';
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		Lockable<?> lockable = (Lockable<?>) o;
+		return refCounter == lockable.refCounter &&
+			Objects.equals(element, lockable.element);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(refCounter, element);
+	}
+
+	/** Serializer for {@link Lockable}. */
+	public static class LockableTypeSerializer<E> extends TypeSerializer<Lockable<E>> {
+		private static final long serialVersionUID = 3298801058463337340L;
+		private final TypeSerializer<E> elementSerializer;
+
+		LockableTypeSerializer(TypeSerializer<E> elementSerializer) {
+			this.elementSerializer = elementSerializer;
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public TypeSerializer<Lockable<E>> duplicate() {
+			return new LockableTypeSerializer<>(elementSerializer);
+		}
+
+		@Override
+		public Lockable<E> createInstance() {
+			return null;
+		}
+
+		@Override
+		public Lockable<E> copy(Lockable<E> from) {
+			return new Lockable<E>(elementSerializer.copy(from.element), from.refCounter);
+		}
+
+		@Override
+		public Lockable<E> copy(
+			Lockable<E> from, Lockable<E> reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(Lockable<E> record, DataOutputView target) throws IOException {
+			IntSerializer.INSTANCE.serialize(record.refCounter, target);
+			elementSerializer.serialize(record.element, target);
+		}
+
+		@Override
+		public Lockable<E> deserialize(DataInputView source) throws IOException {
+			Integer refCount = IntSerializer.INSTANCE.deserialize(source);
+			E record = elementSerializer.deserialize(source);
+			return new Lockable<>(record, refCount);
+		}
+
+		@Override
+		public Lockable<E> deserialize(
+			Lockable<E> reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			IntSerializer.INSTANCE.copy(source, target); // refCounter
+
+			E element = elementSerializer.deserialize(source);
+			elementSerializer.serialize(element, target);
+		}
+
+		@Override
+		public boolean equals(Object o) {
+			if (this == o) {
+				return true;
+			}
+			if (o == null || getClass() != o.getClass()) {
+				return false;
+			}
+			LockableTypeSerializer<?> that = (LockableTypeSerializer<?>) o;
+			return Objects.equals(elementSerializer, that.elementSerializer);
+		}
+
+		@Override
+		public int hashCode() {
+			return Objects.hash(elementSerializer);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj.getClass().equals(LockableTypeSerializer.class);
+		}
+
+		@Override
+		public TypeSerializerConfigSnapshot snapshotConfiguration() {
+			return elementSerializer.snapshotConfiguration();
+		}
+
+		@Override
+		public CompatibilityResult<Lockable<E>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+			CompatibilityResult<E> inputComaptibilityResult = elementSerializer.ensureCompatibility(configSnapshot);
+			if (inputComaptibilityResult.isRequiresMigration()) {
+				return CompatibilityResult.requiresMigration(new LockableTypeSerializer<>(
+					new TypeDeserializerAdapter<>(inputComaptibilityResult.getConvertDeserializer()))
+				);
+			} else {
+				return CompatibilityResult.compatible();
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
new file mode 100644
index 0000000..3a13184
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Unique identifier for {@link SharedBufferNode}.
+ */
+public class NodeId {
+
+	private final String pageName;
+	private final EventId eventId;
+
+	public NodeId(EventId eventId, String pageName) {
+		this.eventId = eventId;
+		this.pageName = pageName;
+	}
+
+	public EventId getEventId() {
+		return eventId;
+	}
+
+	public String getPageName() {
+		return pageName;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		NodeId nodeId = (NodeId) o;
+		return Objects.equals(eventId, nodeId.eventId) &&
+			Objects.equals(pageName, nodeId.pageName);
+	}
+
+	@Override
+	public int hashCode() {
+		return Objects.hash(eventId, pageName);
+	}
+
+	@Override
+	public String toString() {
+		return "NodeId{" +
+			"eventId=" + eventId +
+			", pageName='" + pageName + '\'' +
+			'}';
+	}
+
+	/** Serializer for {@link NodeId}. */
+	public static class NodeIdSerializer extends TypeSerializerSingleton<NodeId> {
+
+		private static final long serialVersionUID = 9209498028181378582L;
+
+		public static final NodeIdSerializer INSTANCE = new NodeIdSerializer();
+
+		private NodeIdSerializer() {
+		}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public NodeId createInstance() {
+			return null;
+		}
+
+		@Override
+		public NodeId copy(NodeId from) {
+			return new NodeId(from.eventId, from.pageName);
+		}
+
+		@Override
+		public NodeId copy(NodeId from, NodeId reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(NodeId record, DataOutputView target) throws IOException {
+			if (record != null) {
+				target.writeByte(1);
+				EventId.EventIdSerializer.INSTANCE.serialize(record.eventId, target);
+				StringSerializer.INSTANCE.serialize(record.pageName, target);
+			} else {
+				target.writeByte(0);
+			}
+		}
+
+		@Override
+		public NodeId deserialize(DataInputView source) throws IOException {
+			byte b = source.readByte();
+			if (b == 0) {
+				return null;
+			}
+
+			EventId eventId = EventId.EventIdSerializer.INSTANCE.deserialize(source);
+			String pageName = StringSerializer.INSTANCE.deserialize(source);
+			return new NodeId(eventId, pageName);
+		}
+
+		@Override
+		public NodeId deserialize(NodeId reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			target.writeByte(source.readByte());
+
+			LongSerializer.INSTANCE.copy(source, target); // eventId
+			LongSerializer.INSTANCE.copy(source, target); // timestamp
+			StringSerializer.INSTANCE.copy(source, target); // pageName
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj.getClass().equals(NodeIdSerializer.class);
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
new file mode 100644
index 0000000..50d997c
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in the buffer.
+ *
+ * <p>The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only one copy of the event.
+ *
+ * <p>The entries in {@link SharedBuffer} are {@link SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
+ *
+ * @param <V> Type of the values
+ * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ */
+public class SharedBuffer<V> {
+
+	private static final String entriesStateName = "sharedBuffer-entries";
+	private static final String eventsStateName = "sharedBuffer-events";
+	private static final String eventsCountStateName = "sharedBuffer-events-count";
+
+	/** The buffer holding the unique events seen so far. */
+	private MapState<EventId, Lockable<V>> eventsBuffer;
+
+	/** The number of events seen so far in the stream per timestamp. */
+	private MapState<Long, Long> eventsCount;
+	private MapState<NodeId, Lockable<SharedBufferNode>> pages;
+
+	public SharedBuffer(KeyedStateStore stateStore, TypeSerializer<V> valueSerializer) {
+		this.eventsBuffer = stateStore.getMapState(
+			new MapStateDescriptor<>(
+				eventsStateName,
+				EventId.EventIdSerializer.INSTANCE,
+				new Lockable.LockableTypeSerializer<>(valueSerializer)));
+
+		this.pages = stateStore.getMapState(
+			new MapStateDescriptor<>(
+				entriesStateName,
+				NodeId.NodeIdSerializer.INSTANCE,
+				new Lockable.LockableTypeSerializer<>(new SharedBufferNode.SharedBufferNodeSerializer())));
+
+		this.eventsCount = stateStore.getMapState(
+			new MapStateDescriptor<>(
+				eventsCountStateName,
+				LongSerializer.INSTANCE,
+				LongSerializer.INSTANCE));
+	}
+
+	/**
+	 * Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a
+	 * lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed
+	 * after processing all {@link org.apache.flink.cep.nfa.ComputationState}s
+	 *
+	 * <p><b>NOTE:</b>Should be called only once for each unique event!
+	 *
+	 * @param value event to be registered
+	 * @return unique id of that event that should be used when putting entries to the buffer.
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public EventId registerEvent(V value, long timestamp) throws Exception {
+		Long id = eventsCount.get(timestamp);
+		if (id == null) {
+			id = 0L;
+		}
+
+		EventId eventId = new EventId(id, timestamp);
+		eventsBuffer.put(eventId, new Lockable<>(value, 1));
+		eventsCount.put(timestamp, id + 1L);
+		return eventId;
+	}
+
+	/**
+	 * Initializes underlying state with given map of events and entries. Should be used only in case of migration from
+	 * old state.
+	 *
+	 * @param events  map of events with assigned unique ids
+	 * @param entries map of SharedBufferNodes
+	 * @throws Exception Thrown if the system cannot access the state.
+	 * @deprecated Only for state migration!
+	 */
+	@Deprecated
+	public void init(
+			Map<EventId, Lockable<V>> events,
+			Map<NodeId, Lockable<SharedBufferNode>> entries) throws Exception {
+		eventsBuffer.putAll(events);
+		pages.putAll(entries);
+
+		Map<Long, Long> maxIds = events.keySet().stream().collect(Collectors.toMap(
+			EventId::getTimestamp,
+			EventId::getId,
+			Math::max
+		));
+		eventsCount.putAll(maxIds);
+	}
+
+	/**
+	 * Stores given value (value + timestamp) under the given state. It assigns no preceding element
+	 * relation to the entry.
+	 *
+	 * @param stateName name of the state that the event should be assigned to
+	 * @param eventId   unique id of event assigned by this SharedBuffer
+	 * @param version   Version of the previous relation
+	 * @return assigned id of this entry
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public NodeId put(
+			final String stateName,
+			final EventId eventId,
+			final DeweyNumber version) throws Exception {
+
+		return put(stateName, eventId, null, version);
+	}
+
+	/**
+	 * Stores given value (value + timestamp) under the given state. It assigns a preceding element
+	 * relation to the previous entry.
+	 *
+	 * @param stateName     name of the state that the event should be assigned to
+	 * @param eventId       unique id of event assigned by this SharedBuffer
+	 * @param previousNodeId id of previous entry
+	 * @param version       Version of the previous relation
+	 * @return assigned id of this element
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public NodeId put(
+			final String stateName,
+			final EventId eventId,
+			@Nullable final NodeId previousNodeId,
+			final DeweyNumber version) throws Exception {
+
+		if (previousNodeId != null) {
+			lockNode(previousNodeId);
+		}
+
+		NodeId currentNodeId = new NodeId(eventId, getOriginalNameFromInternal(stateName));
+		Lockable<SharedBufferNode> currentNode = pages.get(currentNodeId);
+		if (currentNode == null) {
+			currentNode = new Lockable<>(new SharedBufferNode(), 0);
+			lockEvent(eventId);
+		}
+
+		currentNode.getElement().addEdge(new SharedBufferEdge(
+			previousNodeId,
+			version));
+		pages.put(currentNodeId, currentNode);
+
+		return currentNodeId;
+	}
+
+	/**
+	 * Checks if there is no elements in the buffer.
+	 *
+	 * @return true if there is no elements in the buffer
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public boolean isEmpty() throws Exception {
+		return Iterables.isEmpty(eventsBuffer.keys());
+	}
+
+	/**
+	 * Returns all elements from the previous relation starting at the given entry.
+	 *
+	 * @param nodeId  id of the starting entry
+	 * @param version Version of the previous relation which shall be extracted
+	 * @return Collection of previous relations starting with the given value
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public List<Map<String, List<V>>> extractPatterns(
+			final NodeId nodeId,
+			final DeweyNumber version) throws Exception {
+
+		List<Map<String, List<V>>> result = new ArrayList<>();
+
+		// stack to remember the current extraction states
+		Stack<ExtractionState> extractionStates = new Stack<>();
+
+		// get the starting shared buffer entry for the previous relation
+		Lockable<SharedBufferNode> entryLock = pages.get(nodeId);
+
+		if (entryLock != null) {
+			SharedBufferNode entry = entryLock.getElement();
+			extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
+
+			// use a depth first search to reconstruct the previous relations
+			while (!extractionStates.isEmpty()) {
+				final ExtractionState extractionState = extractionStates.pop();
+				// current path of the depth first search
+				final Stack<Tuple2<NodeId, SharedBufferNode>> currentPath = extractionState.getPath();
+				final Tuple2<NodeId, SharedBufferNode> currentEntry = extractionState.getEntry();
+
+				// termination criterion
+				if (currentEntry == null) {
+					final Map<String, List<V>> completePath = new LinkedHashMap<>();
+
+					while (!currentPath.isEmpty()) {
+						final NodeId currentPathEntry = currentPath.pop().f0;
+
+						String page = currentPathEntry.getPageName();
+						List<V> values = completePath
+							.computeIfAbsent(page, k -> new ArrayList<>());
+						values.add(eventsBuffer.get(currentPathEntry.getEventId()).getElement());
+					}
+					result.add(completePath);
+				} else {
+
+					// append state to the path
+					currentPath.push(currentEntry);
+
+					boolean firstMatch = true;
+					for (SharedBufferEdge edge : currentEntry.f1.getEdges()) {
+						// we can only proceed if the current version is compatible to the version
+						// of this previous relation
+						final DeweyNumber currentVersion = extractionState.getVersion();
+						if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
+							final NodeId target = edge.getTarget();
+							Stack<Tuple2<NodeId, SharedBufferNode>> newPath;
+
+							if (firstMatch) {
+								// for the first match we don't have to copy the current path
+								newPath = currentPath;
+								firstMatch = false;
+							} else {
+								newPath = new Stack<>();
+								newPath.addAll(currentPath);
+							}
+
+							extractionStates.push(new ExtractionState(
+								target != null ? Tuple2.of(target, pages.get(target).getElement()) : null,
+								edge.getDeweyNumber(),
+								newPath));
+						}
+					}
+				}
+
+			}
+		}
+		return result;
+	}
+
+	/**
+	 * Increases the reference counter for the given entry so that it is not
+	 * accidentally removed.
+	 *
+	 * @param node id of the entry
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public void lockNode(final NodeId node) throws Exception {
+		Lockable<SharedBufferNode> sharedBufferNode = pages.get(node);
+		if (sharedBufferNode != null) {
+			sharedBufferNode.lock();
+			pages.put(node, sharedBufferNode);
+		}
+	}
+
+	/**
+	 * Decreases the reference counter for the given entry so that it can be
+	 * removed once the reference counter reaches 0.
+	 *
+	 * @param node id of the entry
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public void releaseNode(final NodeId node) throws Exception {
+		Lockable<SharedBufferNode> sharedBufferNode = pages.get(node);
+		if (sharedBufferNode != null) {
+			if (sharedBufferNode.release()) {
+				removeNode(node, sharedBufferNode.getElement());
+			} else {
+				pages.put(node, sharedBufferNode);
+			}
+		}
+	}
+
+	private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception {
+		pages.remove(node);
+		EventId eventId = node.getEventId();
+		releaseEvent(eventId);
+
+		for (SharedBufferEdge sharedBufferEdge : sharedBufferNode.getEdges()) {
+			releaseNode(sharedBufferEdge.getTarget());
+		}
+	}
+
+	private void lockEvent(EventId eventId) throws Exception {
+		Lockable<V> eventWrapper = eventsBuffer.get(eventId);
+		checkState(
+			eventWrapper != null,
+			"Referring to non existent event with id %s",
+			eventId);
+		eventWrapper.lock();
+		eventsBuffer.put(eventId, eventWrapper);
+	}
+
+	/**
+	 * Decreases the reference counter for the given event so that it can be
+	 * removed once the reference counter reaches 0.
+	 *
+	 * @param eventId id of the event
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	public void releaseEvent(EventId eventId) throws Exception {
+		Lockable<V> eventWrapper = eventsBuffer.get(eventId);
+		if (eventWrapper != null) {
+			if (eventWrapper.release()) {
+				eventsBuffer.remove(eventId);
+			} else {
+				eventsBuffer.put(eventId, eventWrapper);
+			}
+		}
+	}
+
+	/**
+	 * Helper class to store the extraction state while extracting a sequence of values following
+	 * the versioned entry edges.
+	 */
+	private static class ExtractionState {
+
+		private final Tuple2<NodeId, SharedBufferNode> entry;
+		private final DeweyNumber version;
+		private final Stack<Tuple2<NodeId, SharedBufferNode>> path;
+
+		ExtractionState(
+				final Tuple2<NodeId, SharedBufferNode> entry,
+				final DeweyNumber version,
+				final Stack<Tuple2<NodeId, SharedBufferNode>> path) {
+			this.entry = entry;
+			this.version = version;
+			this.path = path;
+		}
+
+		public Tuple2<NodeId, SharedBufferNode> getEntry() {
+			return entry;
+		}
+
+		public Stack<Tuple2<NodeId, SharedBufferNode>> getPath() {
+			return path;
+		}
+
+		public DeweyNumber getVersion() {
+			return version;
+		}
+
+		@Override
+		public String toString() {
+			return "ExtractionState(" + entry + ", " + version + ", [" +
+				StringUtils.join(path, ", ") + "])";
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
new file mode 100644
index 0000000..c8d9021
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Versioned edge in {@link SharedBuffer} that allows retrieving predecessors.
+ */
+public class SharedBufferEdge {
+
+	private final NodeId target;
+	private final DeweyNumber deweyNumber;
+
+	/**
+	 * Creates versioned (with {@link DeweyNumber}) edge that points to the target entry.
+	 *
+	 * @param target      id of target entry
+	 * @param deweyNumber version for this edge
+	 */
+	public SharedBufferEdge(NodeId target, DeweyNumber deweyNumber) {
+		this.target = target;
+		this.deweyNumber = deweyNumber;
+	}
+
+	NodeId getTarget() {
+		return target;
+	}
+
+	DeweyNumber getDeweyNumber() {
+		return deweyNumber;
+	}
+
+	@Override
+	public String toString() {
+		return "SharedBufferEdge{" +
+			"target=" + target +
+			", deweyNumber=" + deweyNumber +
+			'}';
+	}
+
+	/** Serializer for {@link SharedBufferEdge}. */
+	public static class SharedBufferEdgeSerializer extends TypeSerializerSingleton<SharedBufferEdge> {
+
+		private static final long serialVersionUID = -5122474955050663979L;
+
+		static final SharedBufferEdgeSerializer INSTANCE = new SharedBufferEdgeSerializer();
+
+		private SharedBufferEdgeSerializer() {}
+
+		@Override
+		public boolean isImmutableType() {
+			return true;
+		}
+
+		@Override
+		public SharedBufferEdge createInstance() {
+			return null;
+		}
+
+		@Override
+		public SharedBufferEdge copy(SharedBufferEdge from) {
+			return new SharedBufferEdge(from.target, from.deweyNumber);
+		}
+
+		@Override
+		public SharedBufferEdge copy(SharedBufferEdge from, SharedBufferEdge reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(SharedBufferEdge record, DataOutputView target) throws IOException {
+			NodeId.NodeIdSerializer.INSTANCE.serialize(record.target, target);
+			DeweyNumber.DeweyNumberSerializer.INSTANCE.serialize(record.deweyNumber, target);
+		}
+
+		@Override
+		public SharedBufferEdge deserialize(DataInputView source) throws IOException {
+			NodeId target = NodeId.NodeIdSerializer.INSTANCE.deserialize(source);
+			DeweyNumber deweyNumber = DeweyNumber.DeweyNumberSerializer.INSTANCE.deserialize(source);
+			return new SharedBufferEdge(target, deweyNumber);
+		}
+
+		@Override
+		public SharedBufferEdge deserialize(SharedBufferEdge reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			NodeId.NodeIdSerializer.INSTANCE.copy(source, target);
+			DeweyNumber.DeweyNumberSerializer.INSTANCE.copy(source, target);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj.getClass().equals(SharedBufferEdgeSerializer.class);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
new file mode 100644
index 0000000..b613625
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge.SharedBufferEdgeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An entry in {@link SharedBuffer} that allows to store relations between different entries.
+ */
+public class SharedBufferNode {
+
+	private final List<SharedBufferEdge> edges;
+
+	public SharedBufferNode() {
+		edges = new ArrayList<>();
+	}
+
+	private SharedBufferNode(List<SharedBufferEdge> edges) {
+		this.edges = edges;
+	}
+
+	public List<SharedBufferEdge> getEdges() {
+		return edges;
+	}
+
+	public void addEdge(SharedBufferEdge edge) {
+		edges.add(edge);
+	}
+
+	@Override
+	public String toString() {
+		return "SharedBufferNode{" +
+			"edges=" + edges +
+			'}';
+	}
+
+	/** Serializer for {@link SharedBufferNode}. */
+	public static class SharedBufferNodeSerializer extends TypeSerializerSingleton<SharedBufferNode> {
+
+		private static final long serialVersionUID = -6687780732295439832L;
+
+		private final ListSerializer<SharedBufferEdge> edgesSerializer =
+			new ListSerializer<>(SharedBufferEdgeSerializer.INSTANCE);
+
+		@Override
+		public boolean isImmutableType() {
+			return false;
+		}
+
+		@Override
+		public SharedBufferNode createInstance() {
+			return new SharedBufferNode(new ArrayList<>());
+		}
+
+		@Override
+		public SharedBufferNode copy(SharedBufferNode from) {
+			return new SharedBufferNode(edgesSerializer.copy(from.edges));
+		}
+
+		@Override
+		public SharedBufferNode copy(SharedBufferNode from, SharedBufferNode reuse) {
+			return copy(from);
+		}
+
+		@Override
+		public int getLength() {
+			return -1;
+		}
+
+		@Override
+		public void serialize(SharedBufferNode record, DataOutputView target) throws IOException {
+			edgesSerializer.serialize(record.edges, target);
+		}
+
+		@Override
+		public SharedBufferNode deserialize(DataInputView source) throws IOException {
+			List<SharedBufferEdge> edges = edgesSerializer.deserialize(source);
+			return new SharedBufferNode(edges);
+		}
+
+		@Override
+		public SharedBufferNode deserialize(SharedBufferNode reuse, DataInputView source) throws IOException {
+			return deserialize(source);
+		}
+
+		@Override
+		public void copy(DataInputView source, DataOutputView target) throws IOException {
+			edgesSerializer.copy(source, target);
+		}
+
+		@Override
+		public boolean canEqual(Object obj) {
+			return obj.getClass().equals(SharedBufferNodeSerializer.class);
+		}
+	}
+}