You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/06/13 14:54:24 UTC
[07/10] flink git commit: [FLINK-9418] Migrate SharedBuffer to use
MapState
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index a00a310..7a43537 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -1,8 +1,8 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOVICE file
+ * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
- * regarding copyright ownership. Vhe ASF licenses this file
+ * regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
@@ -26,635 +26,54 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.cep.nfa.sharedbuffer.Lockable;
+import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferNode;
import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
-import org.apache.commons.lang3.StringUtils;
-
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
-import java.util.Stack;
+import java.util.stream.Collectors;
/**
- * A shared buffer implementation which stores values under a key. Additionally, the values can be
- * versioned such that it is possible to retrieve their predecessor element in the buffer.
- *
- * <p>The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each
- * buffer page maintains a collection of the inserted values.
- *
- * <p>The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store
- * relations between different entries. A dewey versioning scheme allows to discriminate between
- * different relations (e.g. preceding element).
- *
- * <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
- *
- * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
- * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
- *
- * @param <K> Type of the keys
- * @param <V> Type of the values
+ * @deprecated everything in this class is deprecated. Those are only migration procedures from older versions.
*/
-public class SharedBuffer<K, V> {
-
- private Map<K, SharedBufferPage<K, V>> pages;
-
- public SharedBuffer() {
- this.pages = new HashMap<>(4);
- }
-
- /**
- * Stores given value (value + timestamp) under the given key. It assigns a preceding element
- * relation to the entry which is defined by the previous key, value (value + timestamp).
- *
- * @param key Key of the current value
- * @param value Current value
- * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
- * @param previousKey Key of the value for the previous relation
- * @param previousValue Value for the previous relation
- * @param previousTimestamp Timestamp of the value for the previous relation
- * @param version Version of the previous relation
- */
- public int put(
- final K key,
- final V value,
- final long timestamp,
- final K previousKey,
- final V previousValue,
- final long previousTimestamp,
- final int previousCounter,
- final DeweyNumber version) {
-
- final SharedBufferEntry<K, V> previousSharedBufferEntry =
- get(previousKey, previousValue, previousTimestamp, previousCounter);
-
- // sanity check whether we've found the previous element
- if (previousSharedBufferEntry == null && previousValue != null) {
- throw new IllegalStateException("Could not find previous entry with " +
- "key: " + previousKey + ", value: " + previousValue + " and timestamp: " +
- previousTimestamp + ". This can indicate that either you did not implement " +
- "the equals() and hashCode() methods of your input elements properly or that " +
- "the element belonging to that entry has been already pruned.");
- }
-
- return put(key, value, timestamp, previousSharedBufferEntry, version);
- }
-
- /**
- * Stores given value (value + timestamp) under the given key. It assigns no preceding element
- * relation to the entry.
- *
- * @param key Key of the current value
- * @param value Current value
- * @param timestamp Timestamp of the current value (a value requires always a timestamp to make it uniquely referable))
- * @param version Version of the previous relation
- */
- public int put(
- final K key,
- final V value,
- final long timestamp,
- final DeweyNumber version) {
+@Deprecated
+public class SharedBuffer<V> {
- return put(key, value, timestamp, null, version);
- }
+ private final Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext;
+ private final Map<EventId, Lockable<V>> eventsBuffer;
+ private final Map<NodeId, Lockable<SharedBufferNode>> pages;
- private int put(
- final K key,
- final V value,
- final long timestamp,
- final SharedBufferEntry<K, V> previousSharedBufferEntry,
- final DeweyNumber version) {
-
- SharedBufferPage<K, V> page = pages.get(key);
- if (page == null) {
- page = new SharedBufferPage<>(key);
- pages.put(key, page);
- }
-
- // this assumes that elements are processed in order (in terms of time)
- int counter = 0;
- if (previousSharedBufferEntry != null) {
- ValueTimeWrapper<V> prev = previousSharedBufferEntry.getValueTime();
- if (prev != null && prev.getTimestamp() == timestamp) {
- counter = prev.getCounter() + 1;
- }
- }
- page.add(new ValueTimeWrapper<>(value, timestamp, counter), previousSharedBufferEntry, version);
- return counter;
+ public Map<EventId, Lockable<V>> getEventsBuffer() {
+ return eventsBuffer;
}
- public boolean isEmpty() {
- for (SharedBufferPage<K, V> page: pages.values()) {
- if (!page.isEmpty()) {
- return false;
- }
- }
- return true;
+ public Map<NodeId, Lockable<SharedBufferNode>> getPages() {
+ return pages;
}
- /**
- * Deletes all entries in each page which have expired with respect to given pruning timestamp.
- *
- * @param pruningTimestamp The time which is used for pruning. All elements whose timestamp is
- * lower than the pruning timestamp will be removed.
- * @return {@code true} if pruning happened
- */
- public boolean prune(long pruningTimestamp) {
- final Set<SharedBufferEntry<K, V>> prunedEntries = new HashSet<>();
-
- final Iterator<Map.Entry<K, SharedBufferPage<K, V>>> it = pages.entrySet().iterator();
- while (it.hasNext()) {
- SharedBufferPage<K, V> page = it.next().getValue();
-
- page.prune(pruningTimestamp, prunedEntries);
- if (page.isEmpty()) {
- it.remove();
- }
- }
-
- if (prunedEntries.isEmpty()) {
- return false;
- }
+ public SharedBuffer(
+ Map<EventId, Lockable<V>> eventsBuffer,
+ Map<NodeId, Lockable<SharedBufferNode>> pages,
+ Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext) {
- for (SharedBufferPage<K, V> entry : pages.values()) {
- entry.removeEdges(prunedEntries);
- }
- return true;
- }
-
- /**
- * Returns all elements from the previous relation starting at the given value with the
- * given key and timestamp.
- *
- * @param key Key of the starting value
- * @param value Value of the starting element
- * @param timestamp Timestamp of the starting value
- * @param version Version of the previous relation which shall be extracted
- * @return Collection of previous relations starting with the given value
- */
- public List<Map<K, List<V>>> extractPatterns(
- final K key,
- final V value,
- final long timestamp,
- final int counter,
- final DeweyNumber version) {
-
- List<Map<K, List<V>>> result = new ArrayList<>();
-
- // stack to remember the current extraction states
- Stack<ExtractionState<K, V>> extractionStates = new Stack<>();
-
- // get the starting shared buffer entry for the previous relation
- SharedBufferEntry<K, V> entry = get(key, value, timestamp, counter);
-
- if (entry != null) {
- extractionStates.add(new ExtractionState<>(entry, version, new Stack<>()));
-
- // use a depth first search to reconstruct the previous relations
- while (!extractionStates.isEmpty()) {
- final ExtractionState<K, V> extractionState = extractionStates.pop();
- // current path of the depth first search
- final Stack<SharedBufferEntry<K, V>> currentPath = extractionState.getPath();
- final SharedBufferEntry<K, V> currentEntry = extractionState.getEntry();
-
- // termination criterion
- if (currentEntry == null) {
- final Map<K, List<V>> completePath = new LinkedHashMap<>();
-
- while (!currentPath.isEmpty()) {
- final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
-
- K k = currentPathEntry.getKey();
- List<V> values = completePath.get(k);
- if (values == null) {
- values = new ArrayList<>();
- completePath.put(k, values);
- }
- values.add(currentPathEntry.getValueTime().getValue());
- }
- result.add(completePath);
- } else {
-
- // append state to the path
- currentPath.push(currentEntry);
-
- boolean firstMatch = true;
- for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
- // we can only proceed if the current version is compatible to the version
- // of this previous relation
- final DeweyNumber currentVersion = extractionState.getVersion();
- if (currentVersion.isCompatibleWith(edge.getVersion())) {
- if (firstMatch) {
- // for the first match we don't have to copy the current path
- extractionStates.push(new ExtractionState<>(edge.getTarget(), edge.getVersion(), currentPath));
- firstMatch = false;
- } else {
- final Stack<SharedBufferEntry<K, V>> copy = new Stack<>();
- copy.addAll(currentPath);
-
- extractionStates.push(
- new ExtractionState<>(
- edge.getTarget(),
- edge.getVersion(),
- copy));
- }
- }
- }
- }
-
- }
- }
- return result;
- }
-
- /**
- * Increases the reference counter for the given value, key, timestamp entry so that it is not
- * accidentally removed.
- *
- * @param key Key of the value to lock
- * @param value Value to lock
- * @param timestamp Timestamp of the value to lock
- */
- public void lock(final K key, final V value, final long timestamp, int counter) {
- SharedBufferEntry<K, V> entry = get(key, value, timestamp, counter);
- if (entry != null) {
- entry.increaseReferenceCounter();
- }
- }
-
- /**
- * Decreases the reference counter for the given value, key, timestamp entry so that it can be
- * removed once the reference counter reaches 0.
- *
- * @param key Key of the value to release
- * @param value Value to release
- * @param timestamp Timestamp of the value to release
- */
- public void release(final K key, final V value, final long timestamp, int counter) {
- SharedBufferEntry<K, V> entry = get(key, value, timestamp, counter);
- if (entry != null) {
- internalRemove(entry);
- }
- }
-
- private SharedBuffer(Map<K, SharedBufferPage<K, V>> pages) {
+ this.eventsBuffer = eventsBuffer;
this.pages = pages;
+ this.mappingContext = mappingContext;
}
- private SharedBufferEntry<K, V> get(
- final K key,
- final V value,
- final long timestamp,
- final int counter) {
- SharedBufferPage<K, V> page = pages.get(key);
- return page == null ? null : page.get(new ValueTimeWrapper<>(value, timestamp, counter));
- }
-
- private void internalRemove(final SharedBufferEntry<K, V> entry) {
- Stack<SharedBufferEntry<K, V>> entriesToRemove = new Stack<>();
- entriesToRemove.add(entry);
-
- while (!entriesToRemove.isEmpty()) {
- SharedBufferEntry<K, V> currentEntry = entriesToRemove.pop();
- currentEntry.decreaseReferenceCounter();
-
- if (currentEntry.getReferenceCounter() == 0) {
- currentEntry.remove();
-
- for (SharedBufferEdge<K, V> edge : currentEntry.getEdges()) {
- if (edge.getTarget() != null) {
- entriesToRemove.push(edge.getTarget());
- }
- }
- }
- }
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
-
- for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
- builder.append("Key: ").append(entry.getKey()).append(System.lineSeparator());
- builder.append("Value: ").append(entry.getValue()).append(System.lineSeparator());
- }
-
- return builder.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof SharedBuffer) {
- @SuppressWarnings("unchecked")
- SharedBuffer<K, V> other = (SharedBuffer<K, V>) obj;
-
- return pages.equals(other.pages);
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(pages);
- }
-
- /**
- * The SharedBufferPage represents a set of elements which have been stored under the same key.
- *
- * @param <K> Type of the key
- * @param <V> Type of the value
- */
- private static class SharedBufferPage<K, V> {
-
- private final K key;
- private final Map<ValueTimeWrapper<V>, SharedBufferEntry<K, V>> entries;
-
- SharedBufferPage(final K key) {
- this.key = key;
- entries = new HashMap<>();
- }
-
- public K getKey() {
- return key;
- }
-
- /**
- * Adds a new value time pair to the page. The new entry is linked to the previous entry
- * with the given version.
- *
- * @param valueTime Value time pair to be stored
- * @param previous Previous shared buffer entry to which the new entry shall be linked
- * @param version Version of the relation between the new and the previous entry
- */
- public void add(final ValueTimeWrapper<V> valueTime, final SharedBufferEntry<K, V> previous, final DeweyNumber version) {
- SharedBufferEntry<K, V> sharedBufferEntry = entries.get(valueTime);
- if (sharedBufferEntry == null) {
- sharedBufferEntry = new SharedBufferEntry<>(valueTime, this);
- entries.put(valueTime, sharedBufferEntry);
- }
-
- SharedBufferEdge<K, V> newEdge;
- if (previous != null) {
- newEdge = new SharedBufferEdge<>(previous, version);
- previous.increaseReferenceCounter();
- } else {
- newEdge = new SharedBufferEdge<>(null, version);
- }
- sharedBufferEntry.addEdge(newEdge);
- }
-
- public SharedBufferEntry<K, V> get(final ValueTimeWrapper<V> valueTime) {
- return entries.get(valueTime);
- }
-
- /**
- * Removes all entries from the map whose timestamp is smaller than the pruning timestamp.
- * @param pruningTimestamp Timestamp for the pruning
- * @param prunedEntries a {@link Set} to put the removed {@link SharedBufferEntry SharedBufferEntries}.
- */
- private void prune(final long pruningTimestamp, final Set<SharedBufferEntry<K, V>> prunedEntries) {
- Iterator<Map.Entry<ValueTimeWrapper<V>, SharedBufferEntry<K, V>>> it = entries.entrySet().iterator();
- while (it.hasNext()) {
- SharedBufferEntry<K, V> entry = it.next().getValue();
- if (entry.getValueTime().getTimestamp() <= pruningTimestamp) {
- prunedEntries.add(entry);
- it.remove();
- }
- }
- }
-
- /**
- * Remove edges with the specified targets for the entries.
- */
- private void removeEdges(final Set<SharedBufferEntry<K, V>> prunedEntries) {
- for (SharedBufferEntry<K, V> entry : entries.values()) {
- entry.removeEdges(prunedEntries);
- }
- }
-
- public SharedBufferEntry<K, V> remove(final ValueTimeWrapper<V> valueTime) {
- return entries.remove(valueTime);
- }
-
- public boolean isEmpty() {
- return entries.isEmpty();
- }
-
- @Override
- public String toString() {
- StringBuilder builder = new StringBuilder();
- builder.append("SharedBufferPage(" + System.lineSeparator());
- for (SharedBufferEntry<K, V> entry: entries.values()) {
- builder.append(entry).append(System.lineSeparator());
- }
- builder.append(")");
- return builder.toString();
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof SharedBufferPage)) {
- return false;
- }
- SharedBufferPage<K, V> other = (SharedBufferPage<K, V>) obj;
- return key.equals(other.getKey()) && entries.equals(other.entries);
- }
-
- @Override
- public int hashCode() {
- int result = 1;
- result += 31 * result + key.hashCode();
- result += 31 * result + entries.hashCode();
- return result;
- }
- }
-
- /**
- * Entry of a {@link SharedBufferPage}. The entry contains the value timestamp pair, a set of
- * edges to other shared buffer entries denoting a relation, a reference to the owning page and
- * a reference counter. The reference counter counts how many references are kept to this entry.
- *
- * @param <K> Type of the key
- * @param <V> Type of the value
- */
- private static class SharedBufferEntry<K, V> {
-
- private final ValueTimeWrapper<V> valueTime;
- private final Set<SharedBufferEdge<K, V>> edges;
- private final SharedBufferPage<K, V> page;
-
- private int referenceCounter;
- private int entryId;
-
- SharedBufferEntry(
- final ValueTimeWrapper<V> valueTime,
- final SharedBufferPage<K, V> page) {
- this(valueTime, null, page);
- }
-
- SharedBufferEntry(
- final ValueTimeWrapper<V> valueTime,
- final SharedBufferEdge<K, V> edge,
- final SharedBufferPage<K, V> page) {
-
- this.valueTime = valueTime;
- edges = new HashSet<>();
- if (edge != null) {
- edges.add(edge);
- }
- referenceCounter = 0;
- entryId = -1;
- this.page = page;
- }
-
- public ValueTimeWrapper<V> getValueTime() {
- return valueTime;
- }
-
- public Set<SharedBufferEdge<K, V>> getEdges() {
- return edges;
- }
-
- public K getKey() {
- return page.getKey();
- }
-
- public void addEdge(SharedBufferEdge<K, V> edge) {
- edges.add(edge);
- }
-
- /**
- * Remove edges with the specified targets.
- */
- private void removeEdges(final Set<SharedBufferEntry<K, V>> prunedEntries) {
- Iterator<SharedBufferEdge<K, V>> it = edges.iterator();
- while (it.hasNext()) {
- SharedBufferEdge<K, V> edge = it.next();
- if (prunedEntries.contains(edge.getTarget())) {
- it.remove();
- }
- }
- }
-
- public void remove() {
- page.remove(valueTime);
- }
-
- public void increaseReferenceCounter() {
- referenceCounter++;
- }
-
- public void decreaseReferenceCounter() {
- if (referenceCounter > 0) {
- referenceCounter--;
- }
- }
-
- public int getReferenceCounter() {
- return referenceCounter;
- }
-
- @Override
- public String toString() {
- return "SharedBufferEntry(" + valueTime + ", [" + StringUtils.join(edges, ", ") + "], " + referenceCounter + ")";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof SharedBufferEntry)) {
- return false;
- }
-
- @SuppressWarnings("unchecked")
- SharedBufferEntry<K, V> other = (SharedBufferEntry<K, V>) obj;
-
- return valueTime.equals(other.valueTime) &&
- getKey().equals(other.getKey()) &&
- referenceCounter == other.referenceCounter &&
- Objects.equals(edges, other.edges);
- }
-
- @Override
- public int hashCode() {
- int result = 1;
- result += 31 * result + valueTime.hashCode();
- result += 31 * result + getKey().hashCode();
- result += 31 * result + referenceCounter;
- result += 31 * result + edges.hashCode();
- return result;
- }
- }
-
- /**
- * Versioned edge between two shared buffer entries.
- *
- * @param <K> Type of the key
- * @param <V> Type of the value
- */
- private static class SharedBufferEdge<K, V> {
- private final SharedBufferEntry<K, V> target;
- private final DeweyNumber version;
-
- SharedBufferEdge(final SharedBufferEntry<K, V> target, final DeweyNumber version) {
- this.target = target;
- this.version = version;
- }
-
- public SharedBufferEntry<K, V> getTarget() {
- return target;
- }
-
- public DeweyNumber getVersion() {
- return version;
- }
-
- @Override
- public String toString() {
- return "SharedBufferEdge(" + target + ", " + version + ")";
- }
-
- @Override
- public boolean equals(Object obj) {
- if (!(obj instanceof SharedBufferEdge)) {
- return false;
- }
-
- @SuppressWarnings("unchecked")
- SharedBufferEdge<K, V> other = (SharedBufferEdge<K, V>) obj;
- if (!version.equals(other.getVersion())) {
- return false;
- }
-
- if (target == null && other.getTarget() == null) {
- return true;
- } else if (target != null && other.getTarget() != null) {
- return target.getKey().equals(other.getTarget().getKey()) &&
- target.getValueTime().equals(other.getTarget().getValueTime());
- } else {
- return false;
- }
- }
-
- @Override
- public int hashCode() {
- if (target != null) {
- return Objects.hash(target.getKey(), target.getValueTime(), version);
- } else {
- return version.hashCode();
- }
- }
+ public NodeId getNodeId(String prevState, long timestamp, int counter, V event) {
+ return mappingContext.get(Tuple2.of(NFAStateNameHandler.getOriginalNameFromInternal(prevState),
+ new ValueTimeWrapper<>(event, timestamp, counter)));
}
/**
@@ -706,8 +125,8 @@ public class SharedBuffer<K, V> {
ValueTimeWrapper<V> other = (ValueTimeWrapper<V>) obj;
return timestamp == other.getTimestamp()
- && Objects.equals(value, other.getValue())
- && counter == other.getCounter();
+ && Objects.equals(value, other.getValue())
+ && counter == other.getCounter();
}
@Override
@@ -715,17 +134,9 @@ public class SharedBuffer<K, V> {
return (int) (31 * (31 * (timestamp ^ timestamp >>> 32) + value.hashCode()) + counter);
}
- public void serialize(
- final TypeSerializer<V> valueSerializer,
- final DataOutputView target) throws IOException {
- valueSerializer.serialize(value, target);
- target.writeLong(timestamp);
- target.writeInt(counter);
- }
-
public static <V> ValueTimeWrapper<V> deserialize(
- final TypeSerializer<V> valueSerializer,
- final DataInputView source) throws IOException {
+ final TypeSerializer<V> valueSerializer,
+ final DataInputView source) throws IOException {
final V value = valueSerializer.deserialize(source);
final long timestamp = source.readLong();
@@ -736,48 +147,6 @@ public class SharedBuffer<K, V> {
}
/**
- * Helper class to store the extraction state while extracting a sequence of values following
- * the versioned entry edges.
- *
- * @param <K> Type of the key
- * @param <V> Type of the value
- */
- private static class ExtractionState<K, V> {
-
- private final SharedBufferEntry<K, V> entry;
- private final DeweyNumber version;
- private final Stack<SharedBufferEntry<K, V>> path;
-
- ExtractionState(
- final SharedBufferEntry<K, V> entry,
- final DeweyNumber version,
- final Stack<SharedBufferEntry<K, V>> path) {
- this.entry = entry;
- this.version = version;
- this.path = path;
- }
-
- public SharedBufferEntry<K, V> getEntry() {
- return entry;
- }
-
- public DeweyNumber getVersion() {
- return version;
- }
-
- public Stack<SharedBufferEntry<K, V>> getPath() {
- return path;
- }
-
- @Override
- public String toString() {
- return "ExtractionState(" + entry + ", " + version + ", [" + StringUtils.join(path, ", ") + "])";
- }
- }
-
- ////////////// New Serialization ////////////////////
-
- /**
* The {@link TypeSerializerConfigSnapshot} serializer configuration to be stored with the managed state.
*/
public static final class SharedBufferSerializerConfigSnapshot<K, V> extends CompositeTypeSerializerConfigSnapshot {
@@ -785,12 +154,13 @@ public class SharedBuffer<K, V> {
private static final int VERSION = 1;
/** This empty constructor is required for deserializing the configuration. */
- public SharedBufferSerializerConfigSnapshot() {}
+ public SharedBufferSerializerConfigSnapshot() {
+ }
public SharedBufferSerializerConfigSnapshot(
- final TypeSerializer<K> keySerializer,
- final TypeSerializer<V> valueSerializer,
- final TypeSerializer<DeweyNumber> versionSerializer) {
+ final TypeSerializer<K> keySerializer,
+ final TypeSerializer<V> valueSerializer,
+ final TypeSerializer<DeweyNumber> versionSerializer) {
super(keySerializer, valueSerializer, versionSerializer);
}
@@ -804,7 +174,7 @@ public class SharedBuffer<K, V> {
/**
* A {@link TypeSerializer} for the {@link SharedBuffer}.
*/
- public static class SharedBufferSerializer<K, V> extends TypeSerializer<SharedBuffer<K, V>> {
+ public static class SharedBufferSerializer<K, V> extends TypeSerializer<SharedBuffer<V>> {
private static final long serialVersionUID = -3254176794680331560L;
@@ -815,13 +185,13 @@ public class SharedBuffer<K, V> {
public SharedBufferSerializer(
final TypeSerializer<K> keySerializer,
final TypeSerializer<V> valueSerializer) {
- this(keySerializer, valueSerializer, new DeweyNumber.DeweyNumberSerializer());
+ this(keySerializer, valueSerializer, DeweyNumber.DeweyNumberSerializer.INSTANCE);
}
public SharedBufferSerializer(
- final TypeSerializer<K> keySerializer,
- final TypeSerializer<V> valueSerializer,
- final TypeSerializer<DeweyNumber> versionSerializer) {
+ final TypeSerializer<K> keySerializer,
+ final TypeSerializer<V> valueSerializer,
+ final TypeSerializer<DeweyNumber> versionSerializer) {
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
@@ -851,32 +221,18 @@ public class SharedBuffer<K, V> {
}
@Override
- public SharedBuffer<K, V> createInstance() {
- return new SharedBuffer<>();
+ public SharedBuffer<V> createInstance() {
+ throw new UnsupportedOperationException();
}
@Override
- public SharedBuffer<K, V> copy(SharedBuffer<K, V> from) {
- try {
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- serialize(from, new DataOutputViewStreamWrapper(baos));
- baos.close();
-
- byte[] data = baos.toByteArray();
-
- ByteArrayInputStream bais = new ByteArrayInputStream(data);
- SharedBuffer<K, V> copy = deserialize(new DataInputViewStreamWrapper(bais));
- bais.close();
-
- return copy;
- } catch (IOException e) {
- throw new RuntimeException("Could not copy SharredBuffer.", e);
- }
+ public SharedBuffer<V> copy(SharedBuffer<V> from) {
+ throw new UnsupportedOperationException();
}
@Override
- public SharedBuffer<K, V> copy(SharedBuffer<K, V> from, SharedBuffer<K, V> reuse) {
- return copy(from);
+ public SharedBuffer<V> copy(SharedBuffer<V> from, SharedBuffer<V> reuse) {
+ throw new UnsupportedOperationException();
}
@Override
@@ -885,83 +241,43 @@ public class SharedBuffer<K, V> {
}
@Override
- public void serialize(SharedBuffer<K, V> record, DataOutputView target) throws IOException {
- Map<K, SharedBufferPage<K, V>> pages = record.pages;
-
- int totalEdges = 0;
- int entryCounter = 0;
-
- // number of pages
- target.writeInt(pages.size());
-
- for (SharedBufferPage<K, V> page: pages.values()) {
-
- // key for the current page
- keySerializer.serialize(page.getKey(), target);
-
- target.writeInt(page.entries.size());
- for (SharedBufferEntry<K, V> sharedBuffer: page.entries.values()) {
-
- // assign id to the sharedBufferEntry for the future
- // serialization of the previous relation
- sharedBuffer.entryId = entryCounter++;
-
- ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
- valueTimeWrapper.serialize(valueSerializer, target);
- target.writeInt(sharedBuffer.getReferenceCounter());
-
- totalEdges += sharedBuffer.getEdges().size();
- }
- }
-
- // write the edges between the shared buffer entries
- target.writeInt(totalEdges);
-
- for (SharedBufferPage<K, V> page: pages.values()) {
- for (SharedBufferEntry<K, V> sharedBuffer: page.entries.values()) {
-
- // in order to serialize the previous relation we simply serialize
- // the ids of the source and target SharedBufferEntry
-
- int sourceId = sharedBuffer.entryId;
-
- for (SharedBufferEdge<K, V> edge: sharedBuffer.edges) {
- int targetId = -1;
- if (edge.getTarget() != null) {
- targetId = edge.getTarget().entryId;
- }
-
- target.writeInt(sourceId);
- target.writeInt(targetId);
- versionSerializer.serialize(edge.getVersion(), target);
- }
- }
- }
+ public void serialize(SharedBuffer<V> record, DataOutputView target) throws IOException {
+ throw new UnsupportedOperationException();
}
@Override
- public SharedBuffer<K, V> deserialize(DataInputView source) throws IOException {
- List<SharedBufferEntry<K, V>> entryList = new ArrayList<>();
- Map<K, SharedBufferPage<K, V>> pages = new HashMap<>();
-
+ public SharedBuffer<V> deserialize(DataInputView source) throws IOException {
+ List<Tuple2<NodeId, Lockable<SharedBufferNode>>> entries = new ArrayList<>();
+ Map<ValueTimeWrapper<V>, EventId> values = new HashMap<>();
+ Map<EventId, Lockable<V>> valuesWithIds = new HashMap<>();
+ Map<Tuple2<String, ValueTimeWrapper<V>>, NodeId> mappingContext = new HashMap<>();
+ Map<Long, Long> totalEventsPerTimestamp = new HashMap<>();
int totalPages = source.readInt();
for (int i = 0; i < totalPages; i++) {
-
// key of the page
- K key = keySerializer.deserialize(source);
- SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
- pages.put(key, page);
+ K stateName = keySerializer.deserialize(source);
int numberEntries = source.readInt();
for (int j = 0; j < numberEntries; j++) {
- ValueTimeWrapper<V> valueTimeWrapper = ValueTimeWrapper.deserialize(valueSerializer, source);
- SharedBufferEntry<K, V> sharedBufferEntry = new SharedBufferEntry<>(valueTimeWrapper, page);
- sharedBufferEntry.referenceCounter = source.readInt();
+ ValueTimeWrapper<V> wrapper = ValueTimeWrapper.deserialize(valueSerializer, source);
+ EventId eventId = values.get(wrapper);
+ if (eventId == null) {
+ long id = totalEventsPerTimestamp.computeIfAbsent(wrapper.timestamp, k -> 0L);
+ eventId = new EventId(id, wrapper.timestamp);
+ values.put(wrapper, eventId);
+ valuesWithIds.put(eventId, new Lockable<>(wrapper.value, 1));
+ totalEventsPerTimestamp.computeIfPresent(wrapper.timestamp, (k, v) -> v + 1);
+ } else {
+ Lockable<V> eventWrapper = valuesWithIds.get(eventId);
+ eventWrapper.lock();
+ }
- page.entries.put(valueTimeWrapper, sharedBufferEntry);
+ NodeId nodeId = new NodeId(eventId, (String) stateName);
+ int refCount = source.readInt();
- entryList.add(sharedBufferEntry);
+ entries.add(Tuple2.of(nodeId, new Lockable<>(new SharedBufferNode(), refCount)));
+ mappingContext.put(Tuple2.of((String) stateName, wrapper), nodeId);
}
}
@@ -977,61 +293,25 @@ public class SharedBuffer<K, V> {
// We've already deserialized the shared buffer entry. Simply read its ID and
// retrieve the buffer entry from the list of entries
- SharedBufferEntry<K, V> sourceEntry = entryList.get(sourceIdx);
- SharedBufferEntry<K, V> targetEntry = targetIdx < 0 ? null : entryList.get(targetIdx);
- sourceEntry.edges.add(new SharedBufferEdge<>(targetEntry, version));
+ Tuple2<NodeId, Lockable<SharedBufferNode>> sourceEntry = entries.get(sourceIdx);
+ Tuple2<NodeId, Lockable<SharedBufferNode>> targetEntry =
+ targetIdx < 0 ? Tuple2.of(null, null) : entries.get(targetIdx);
+ sourceEntry.f1.getElement().addEdge(new SharedBufferEdge(targetEntry.f0, version));
}
- return new SharedBuffer<>(pages);
+
+ Map<NodeId, Lockable<SharedBufferNode>> entriesMap = entries.stream().collect(Collectors.toMap(e -> e.f0, e -> e.f1));
+
+ return new SharedBuffer<>(valuesWithIds, entriesMap, mappingContext);
}
@Override
- public SharedBuffer<K, V> deserialize(SharedBuffer<K, V> reuse, DataInputView source) throws IOException {
+ public SharedBuffer<V> deserialize(SharedBuffer<V> reuse, DataInputView source) throws IOException {
return deserialize(source);
}
@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
- int numberPages = source.readInt();
- target.writeInt(numberPages);
-
- for (int i = 0; i < numberPages; i++) {
- // key of the page
- @SuppressWarnings("unchecked")
- K key = keySerializer.deserialize(source);
- keySerializer.serialize(key, target);
-
- int numberEntries = source.readInt();
-
- for (int j = 0; j < numberEntries; j++) {
- // restore the SharedBufferEntries for the given page
- V value = valueSerializer.deserialize(source);
- valueSerializer.serialize(value, target);
-
- long timestamp = source.readLong();
- target.writeLong(timestamp);
-
- int counter = source.readInt();
- target.writeInt(counter);
-
- int referenceCounter = source.readInt();
- target.writeInt(referenceCounter);
- }
- }
-
- // read the edges of the shared buffer entries
- int numberEdges = source.readInt();
- target.writeInt(numberEdges);
-
- for (int j = 0; j < numberEdges; j++) {
- int sourceIndex = source.readInt();
- int targetIndex = source.readInt();
-
- target.writeInt(sourceIndex);
- target.writeInt(targetIndex);
-
- DeweyNumber version = versionSerializer.deserialize(source);
- versionSerializer.serialize(version, target);
- }
+ throw new UnsupportedOperationException();
}
@Override
@@ -1046,7 +326,7 @@ public class SharedBuffer<K, V> {
SharedBufferSerializer other = (SharedBufferSerializer) obj;
return
- Objects.equals(keySerializer, other.getKeySerializer()) &&
+ Objects.equals(keySerializer, other.getKeySerializer()) &&
Objects.equals(valueSerializer, other.getValueSerializer()) &&
Objects.equals(versionSerializer, other.getVersionSerializer());
}
@@ -1064,47 +344,48 @@ public class SharedBuffer<K, V> {
@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
return new SharedBufferSerializerConfigSnapshot<>(
- keySerializer,
- valueSerializer,
- versionSerializer);
+ keySerializer,
+ valueSerializer,
+ versionSerializer);
}
@Override
- public CompatibilityResult<SharedBuffer<K, V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ public CompatibilityResult<SharedBuffer<V>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof SharedBufferSerializerConfigSnapshot) {
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializerConfigSnapshots =
- ((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
+ ((SharedBufferSerializerConfigSnapshot) configSnapshot).getNestedSerializersAndConfigs();
CompatibilityResult<K> keyCompatResult = CompatibilityUtil.resolveCompatibilityResult(
- serializerConfigSnapshots.get(0).f0,
- UnloadableDummyTypeSerializer.class,
- serializerConfigSnapshots.get(0).f1,
- keySerializer);
+ serializerConfigSnapshots.get(0).f0,
+ UnloadableDummyTypeSerializer.class,
+ serializerConfigSnapshots.get(0).f1,
+ keySerializer);
CompatibilityResult<V> valueCompatResult = CompatibilityUtil.resolveCompatibilityResult(
- serializerConfigSnapshots.get(1).f0,
- UnloadableDummyTypeSerializer.class,
- serializerConfigSnapshots.get(1).f1,
- valueSerializer);
+ serializerConfigSnapshots.get(1).f0,
+ UnloadableDummyTypeSerializer.class,
+ serializerConfigSnapshots.get(1).f1,
+ valueSerializer);
CompatibilityResult<DeweyNumber> versionCompatResult = CompatibilityUtil.resolveCompatibilityResult(
- serializerConfigSnapshots.get(2).f0,
- UnloadableDummyTypeSerializer.class,
- serializerConfigSnapshots.get(2).f1,
- versionSerializer);
+ serializerConfigSnapshots.get(2).f0,
+ UnloadableDummyTypeSerializer.class,
+ serializerConfigSnapshots.get(2).f1,
+ versionSerializer);
- if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() && !versionCompatResult.isRequiresMigration()) {
+ if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration() &&
+ !versionCompatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else {
if (keyCompatResult.getConvertDeserializer() != null
- && valueCompatResult.getConvertDeserializer() != null
- && versionCompatResult.getConvertDeserializer() != null) {
+ && valueCompatResult.getConvertDeserializer() != null
+ && versionCompatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
- new SharedBufferSerializer<>(
- new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
- new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()),
- new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer())
- ));
+ new SharedBufferSerializer<>(
+ new TypeDeserializerAdapter<>(keyCompatResult.getConvertDeserializer()),
+ new TypeDeserializerAdapter<>(valueCompatResult.getConvertDeserializer()),
+ new TypeDeserializerAdapter<>(versionCompatResult.getConvertDeserializer())
+ ));
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 5b9522b..dbb654c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,7 +18,6 @@
package org.apache.flink.cep.nfa.compiler;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.nfa.AfterMatchSkipStrategy;
import org.apache.flink.cep.nfa.NFA;
@@ -53,29 +52,10 @@ public class NFACompiler {
protected static final String ENDING_STATE_NAME = "$endState$";
/**
- * Compiles the given pattern into a {@link NFA}.
- *
- * @param pattern Definition of sequence pattern
- * @param inputTypeSerializer Serializer for the input type
- * @param timeoutHandling True if the NFA shall return timed out event patterns
- * @param <T> Type of the input events
- * @return Non-deterministic finite automaton representing the given pattern
- */
- public static <T> NFA<T> compile(
- Pattern<T, ?> pattern,
- TypeSerializer<T> inputTypeSerializer,
- boolean timeoutHandling) {
- NFAFactory<T> factory = compileFactory(pattern, inputTypeSerializer, timeoutHandling);
-
- return factory.createNFA();
- }
-
- /**
* Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
* multiple NFAs.
*
* @param pattern Definition of sequence pattern
- * @param inputTypeSerializer Serializer for the input type
* @param timeoutHandling True if the NFA shall return timed out event patterns
* @param <T> Type of the input events
* @return Factory for NFAs corresponding to the given pattern
@@ -83,15 +63,14 @@ public class NFACompiler {
@SuppressWarnings("unchecked")
public static <T> NFAFactory<T> compileFactory(
final Pattern<T, ?> pattern,
- final TypeSerializer<T> inputTypeSerializer,
boolean timeoutHandling) {
if (pattern == null) {
// return a factory for empty NFAs
- return new NFAFactoryImpl<>(inputTypeSerializer, 0, Collections.<State<T>>emptyList(), timeoutHandling);
+ return new NFAFactoryImpl<>(0, Collections.<State<T>>emptyList(), timeoutHandling);
} else {
final NFAFactoryCompiler<T> nfaFactoryCompiler = new NFAFactoryCompiler<>(pattern);
nfaFactoryCompiler.compileFactory();
- return new NFAFactoryImpl<>(inputTypeSerializer, nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
+ return new NFAFactoryImpl<>(nfaFactoryCompiler.getWindowTime(), nfaFactoryCompiler.getStates(), timeoutHandling);
}
}
@@ -900,18 +879,15 @@ public class NFACompiler {
private static final long serialVersionUID = 8939783698296714379L;
- private final TypeSerializer<T> inputTypeSerializer;
private final long windowTime;
private final Collection<State<T>> states;
private final boolean timeoutHandling;
private NFAFactoryImpl(
- TypeSerializer<T> inputTypeSerializer,
- long windowTime,
- Collection<State<T>> states,
- boolean timeoutHandling) {
+ long windowTime,
+ Collection<State<T>> states,
+ boolean timeoutHandling) {
- this.inputTypeSerializer = inputTypeSerializer;
this.windowTime = windowTime;
this.states = states;
this.timeoutHandling = timeoutHandling;
@@ -919,8 +895,7 @@ public class NFACompiler {
@Override
public NFA<T> createNFA() {
- return new NFA<>(
- inputTypeSerializer.duplicate(), windowTime, timeoutHandling, states);
+ return new NFA<>(states, windowTime, timeoutHandling);
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
new file mode 100644
index 0000000..9b99ea1
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/EventId.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Composite key for events in {@link SharedBuffer}.
+ */
+public class EventId {
+ private final long id;
+ private final long timestamp;
+
+ public EventId(long id, long timestamp) {
+ this.id = id;
+ this.timestamp = timestamp;
+ }
+
+ public long getId() {
+ return id;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ EventId eventId = (EventId) o;
+ return id == eventId.id &&
+ timestamp == eventId.timestamp;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, timestamp);
+ }
+
+ @Override
+ public String toString() {
+ return "EventId{" +
+ "id=" + id +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+
+ /** {@link TypeSerializer} for {@link EventId}. */
+ public static class EventIdSerializer extends TypeSerializerSingleton<EventId> {
+
+ private static final long serialVersionUID = -5685733582601394497L;
+
+ private EventIdSerializer() {
+ }
+
+ public static final EventIdSerializer INSTANCE = new EventIdSerializer();
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public EventId createInstance() {
+ return null;
+ }
+
+ @Override
+ public EventId copy(EventId from) {
+ return new EventId(from.id, from.timestamp);
+ }
+
+ @Override
+ public EventId copy(EventId from, EventId reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return 2 * LongSerializer.INSTANCE.getLength();
+ }
+
+ @Override
+ public void serialize(EventId record, DataOutputView target) throws IOException {
+ LongSerializer.INSTANCE.serialize(record.id, target);
+ LongSerializer.INSTANCE.serialize(record.timestamp, target);
+ }
+
+ @Override
+ public EventId deserialize(DataInputView source) throws IOException {
+ Long id = LongSerializer.INSTANCE.deserialize(source);
+ Long timestamp = LongSerializer.INSTANCE.deserialize(source);
+
+ return new EventId(id, timestamp);
+ }
+
+ @Override
+ public EventId deserialize(EventId reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ LongSerializer.INSTANCE.copy(source, target);
+ LongSerializer.INSTANCE.copy(source, target);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj.getClass().equals(EventIdSerializer.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
new file mode 100644
index 0000000..ca1ecae
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/Lockable.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.CompatibilityResult;
+import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Implements locking logic for incoming event and
+ * {@link SharedBufferNode} using a lock reference counter.
+ */
+public final class Lockable<T> {
+
+ private int refCounter;
+
+ private final T element;
+
+ public Lockable(T element, int refCounter) {
+ this.refCounter = refCounter;
+ this.element = element;
+ }
+
+ public void lock() {
+ refCounter += 1;
+ }
+
+ /**
+ * Releases lock on this object. If no more locks are acquired on it, this method will return true.
+ *
+ * @return true if no more locks are acquired
+ */
+ boolean release() {
+ if (refCounter <= 0) {
+ return true;
+ }
+
+ refCounter -= 1;
+ return refCounter == 0;
+ }
+
+ int getRefCounter() {
+ return refCounter;
+ }
+
+ public T getElement() {
+ return element;
+ }
+
+ @Override
+ public String toString() {
+ return "Lock{" +
+ "refCounter=" + refCounter +
+ '}';
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Lockable<?> lockable = (Lockable<?>) o;
+ return refCounter == lockable.refCounter &&
+ Objects.equals(element, lockable.element);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(refCounter, element);
+ }
+
+ /** Serializer for {@link Lockable}. */
+ public static class LockableTypeSerializer<E> extends TypeSerializer<Lockable<E>> {
+ private static final long serialVersionUID = 3298801058463337340L;
+ private final TypeSerializer<E> elementSerializer;
+
+ LockableTypeSerializer(TypeSerializer<E> elementSerializer) {
+ this.elementSerializer = elementSerializer;
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public TypeSerializer<Lockable<E>> duplicate() {
+ return new LockableTypeSerializer<>(elementSerializer);
+ }
+
+ @Override
+ public Lockable<E> createInstance() {
+ return null;
+ }
+
+ @Override
+ public Lockable<E> copy(Lockable<E> from) {
+ return new Lockable<E>(elementSerializer.copy(from.element), from.refCounter);
+ }
+
+ @Override
+ public Lockable<E> copy(
+ Lockable<E> from, Lockable<E> reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(Lockable<E> record, DataOutputView target) throws IOException {
+ IntSerializer.INSTANCE.serialize(record.refCounter, target);
+ elementSerializer.serialize(record.element, target);
+ }
+
+ @Override
+ public Lockable<E> deserialize(DataInputView source) throws IOException {
+ Integer refCount = IntSerializer.INSTANCE.deserialize(source);
+ E record = elementSerializer.deserialize(source);
+ return new Lockable<>(record, refCount);
+ }
+
+ @Override
+ public Lockable<E> deserialize(
+ Lockable<E> reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ IntSerializer.INSTANCE.copy(source, target); // refCounter
+
+ E element = elementSerializer.deserialize(source);
+ elementSerializer.serialize(element, target);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ LockableTypeSerializer<?> that = (LockableTypeSerializer<?>) o;
+ return Objects.equals(elementSerializer, that.elementSerializer);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(elementSerializer);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj.getClass().equals(LockableTypeSerializer.class);
+ }
+
+ @Override
+ public TypeSerializerConfigSnapshot snapshotConfiguration() {
+ return elementSerializer.snapshotConfiguration();
+ }
+
+ @Override
+ public CompatibilityResult<Lockable<E>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
+ CompatibilityResult<E> inputComaptibilityResult = elementSerializer.ensureCompatibility(configSnapshot);
+ if (inputComaptibilityResult.isRequiresMigration()) {
+ return CompatibilityResult.requiresMigration(new LockableTypeSerializer<>(
+ new TypeDeserializerAdapter<>(inputComaptibilityResult.getConvertDeserializer()))
+ );
+ } else {
+ return CompatibilityResult.compatible();
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
new file mode 100644
index 0000000..3a13184
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/NodeId.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Unique identifier for {@link SharedBufferNode}.
+ */
+public class NodeId {
+
+ private final String pageName;
+ private final EventId eventId;
+
+ public NodeId(EventId eventId, String pageName) {
+ this.eventId = eventId;
+ this.pageName = pageName;
+ }
+
+ public EventId getEventId() {
+ return eventId;
+ }
+
+ public String getPageName() {
+ return pageName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ NodeId nodeId = (NodeId) o;
+ return Objects.equals(eventId, nodeId.eventId) &&
+ Objects.equals(pageName, nodeId.pageName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(eventId, pageName);
+ }
+
+ @Override
+ public String toString() {
+ return "NodeId{" +
+ "eventId=" + eventId +
+ ", pageName='" + pageName + '\'' +
+ '}';
+ }
+
+ /** Serializer for {@link NodeId}. */
+ public static class NodeIdSerializer extends TypeSerializerSingleton<NodeId> {
+
+ private static final long serialVersionUID = 9209498028181378582L;
+
+ public static final NodeIdSerializer INSTANCE = new NodeIdSerializer();
+
+ private NodeIdSerializer() {
+ }
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public NodeId createInstance() {
+ return null;
+ }
+
+ @Override
+ public NodeId copy(NodeId from) {
+ return new NodeId(from.eventId, from.pageName);
+ }
+
+ @Override
+ public NodeId copy(NodeId from, NodeId reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(NodeId record, DataOutputView target) throws IOException {
+ if (record != null) {
+ target.writeByte(1);
+ EventId.EventIdSerializer.INSTANCE.serialize(record.eventId, target);
+ StringSerializer.INSTANCE.serialize(record.pageName, target);
+ } else {
+ target.writeByte(0);
+ }
+ }
+
+ @Override
+ public NodeId deserialize(DataInputView source) throws IOException {
+ byte b = source.readByte();
+ if (b == 0) {
+ return null;
+ }
+
+ EventId eventId = EventId.EventIdSerializer.INSTANCE.deserialize(source);
+ String pageName = StringSerializer.INSTANCE.deserialize(source);
+ return new NodeId(eventId, pageName);
+ }
+
+ @Override
+ public NodeId deserialize(NodeId reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ target.writeByte(source.readByte());
+
+ LongSerializer.INSTANCE.copy(source, target); // eventId
+ LongSerializer.INSTANCE.copy(source, target); // timestamp
+ StringSerializer.INSTANCE.copy(source, target); // pageName
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj.getClass().equals(NodeIdSerializer.class);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
new file mode 100644
index 0000000..50d997c
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBuffer.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOVICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. Vhe ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.state.KeyedStateStore;
+import org.apache.flink.api.common.state.MapState;
+import org.apache.flink.api.common.state.MapStateDescriptor;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.DeweyNumber;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables;
+
+import org.apache.commons.lang3.StringUtils;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.cep.nfa.compiler.NFAStateNameHandler.getOriginalNameFromInternal;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A shared buffer implementation which stores values under according state. Additionally, the values can be
+ * versioned such that it is possible to retrieve their predecessor element in the buffer.
+ *
+ * <p>The idea of the implementation is to have a buffer for incoming events with unique ids assigned to them. This way
+ * we do not need to deserialize events during processing and we store only one copy of the event.
+ *
+ * <p>The entries in {@link SharedBuffer} are {@link SharedBufferNode}. The shared buffer node allows to store
+ * relations between different entries. A dewey versioning scheme allows to discriminate between
+ * different relations (e.g. preceding element).
+ *
+ * <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
+ *
+ * @param <V> Type of the values
+ * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ */
+public class SharedBuffer<V> {
+
+ private static final String entriesStateName = "sharedBuffer-entries";
+ private static final String eventsStateName = "sharedBuffer-events";
+ private static final String eventsCountStateName = "sharedBuffer-events-count";
+
+ /** The buffer holding the unique events seen so far. */
+ private MapState<EventId, Lockable<V>> eventsBuffer;
+
+ /** The number of events seen so far in the stream per timestamp. */
+ private MapState<Long, Long> eventsCount;
+ private MapState<NodeId, Lockable<SharedBufferNode>> pages;
+
+ public SharedBuffer(KeyedStateStore stateStore, TypeSerializer<V> valueSerializer) {
+ this.eventsBuffer = stateStore.getMapState(
+ new MapStateDescriptor<>(
+ eventsStateName,
+ EventId.EventIdSerializer.INSTANCE,
+ new Lockable.LockableTypeSerializer<>(valueSerializer)));
+
+ this.pages = stateStore.getMapState(
+ new MapStateDescriptor<>(
+ entriesStateName,
+ NodeId.NodeIdSerializer.INSTANCE,
+ new Lockable.LockableTypeSerializer<>(new SharedBufferNode.SharedBufferNodeSerializer())));
+
+ this.eventsCount = stateStore.getMapState(
+ new MapStateDescriptor<>(
+ eventsCountStateName,
+ LongSerializer.INSTANCE,
+ LongSerializer.INSTANCE));
+ }
+
+ /**
+ * Adds another unique event to the shared buffer and assigns a unique id for it. It automatically creates a
+ * lock on this event, so it won't be removed during processing of that event. Therefore the lock should be removed
+ * after processing all {@link org.apache.flink.cep.nfa.ComputationState}s
+ *
+ * <p><b>NOTE:</b>Should be called only once for each unique event!
+ *
+ * @param value event to be registered
+ * @return unique id of that event that should be used when putting entries to the buffer.
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public EventId registerEvent(V value, long timestamp) throws Exception {
+ Long id = eventsCount.get(timestamp);
+ if (id == null) {
+ id = 0L;
+ }
+
+ EventId eventId = new EventId(id, timestamp);
+ eventsBuffer.put(eventId, new Lockable<>(value, 1));
+ eventsCount.put(timestamp, id + 1L);
+ return eventId;
+ }
+
+ /**
+ * Initializes underlying state with given map of events and entries. Should be used only in case of migration from
+ * old state.
+ *
+ * @param events map of events with assigned unique ids
+ * @param entries map of SharedBufferNodes
+ * @throws Exception Thrown if the system cannot access the state.
+ * @deprecated Only for state migration!
+ */
+ @Deprecated
+ public void init(
+ Map<EventId, Lockable<V>> events,
+ Map<NodeId, Lockable<SharedBufferNode>> entries) throws Exception {
+ eventsBuffer.putAll(events);
+ pages.putAll(entries);
+
+ Map<Long, Long> maxIds = events.keySet().stream().collect(Collectors.toMap(
+ EventId::getTimestamp,
+ EventId::getId,
+ Math::max
+ ));
+ eventsCount.putAll(maxIds);
+ }
+
+ /**
+ * Stores given value (value + timestamp) under the given state. It assigns no preceding element
+ * relation to the entry.
+ *
+ * @param stateName name of the state that the event should be assigned to
+ * @param eventId unique id of event assigned by this SharedBuffer
+ * @param version Version of the previous relation
+ * @return assigned id of this entry
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public NodeId put(
+ final String stateName,
+ final EventId eventId,
+ final DeweyNumber version) throws Exception {
+
+ return put(stateName, eventId, null, version);
+ }
+
+ /**
+ * Stores given value (value + timestamp) under the given state. It assigns a preceding element
+ * relation to the previous entry.
+ *
+ * @param stateName name of the state that the event should be assigned to
+ * @param eventId unique id of event assigned by this SharedBuffer
+ * @param previousNodeId id of previous entry
+ * @param version Version of the previous relation
+ * @return assigned id of this element
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public NodeId put(
+ final String stateName,
+ final EventId eventId,
+ @Nullable final NodeId previousNodeId,
+ final DeweyNumber version) throws Exception {
+
+ if (previousNodeId != null) {
+ lockNode(previousNodeId);
+ }
+
+ NodeId currentNodeId = new NodeId(eventId, getOriginalNameFromInternal(stateName));
+ Lockable<SharedBufferNode> currentNode = pages.get(currentNodeId);
+ if (currentNode == null) {
+ currentNode = new Lockable<>(new SharedBufferNode(), 0);
+ lockEvent(eventId);
+ }
+
+ currentNode.getElement().addEdge(new SharedBufferEdge(
+ previousNodeId,
+ version));
+ pages.put(currentNodeId, currentNode);
+
+ return currentNodeId;
+ }
+
+ /**
+ * Checks if there is no elements in the buffer.
+ *
+ * @return true if there is no elements in the buffer
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public boolean isEmpty() throws Exception {
+ return Iterables.isEmpty(eventsBuffer.keys());
+ }
+
+ /**
+ * Returns all elements from the previous relation starting at the given entry.
+ *
+ * @param nodeId id of the starting entry
+ * @param version Version of the previous relation which shall be extracted
+ * @return Collection of previous relations starting with the given value
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public List<Map<String, List<V>>> extractPatterns(
+ final NodeId nodeId,
+ final DeweyNumber version) throws Exception {
+
+ List<Map<String, List<V>>> result = new ArrayList<>();
+
+ // stack to remember the current extraction states
+ Stack<ExtractionState> extractionStates = new Stack<>();
+
+ // get the starting shared buffer entry for the previous relation
+ Lockable<SharedBufferNode> entryLock = pages.get(nodeId);
+
+ if (entryLock != null) {
+ SharedBufferNode entry = entryLock.getElement();
+ extractionStates.add(new ExtractionState(Tuple2.of(nodeId, entry), version, new Stack<>()));
+
+ // use a depth first search to reconstruct the previous relations
+ while (!extractionStates.isEmpty()) {
+ final ExtractionState extractionState = extractionStates.pop();
+ // current path of the depth first search
+ final Stack<Tuple2<NodeId, SharedBufferNode>> currentPath = extractionState.getPath();
+ final Tuple2<NodeId, SharedBufferNode> currentEntry = extractionState.getEntry();
+
+ // termination criterion
+ if (currentEntry == null) {
+ final Map<String, List<V>> completePath = new LinkedHashMap<>();
+
+ while (!currentPath.isEmpty()) {
+ final NodeId currentPathEntry = currentPath.pop().f0;
+
+ String page = currentPathEntry.getPageName();
+ List<V> values = completePath
+ .computeIfAbsent(page, k -> new ArrayList<>());
+ values.add(eventsBuffer.get(currentPathEntry.getEventId()).getElement());
+ }
+ result.add(completePath);
+ } else {
+
+ // append state to the path
+ currentPath.push(currentEntry);
+
+ boolean firstMatch = true;
+ for (SharedBufferEdge edge : currentEntry.f1.getEdges()) {
+ // we can only proceed if the current version is compatible to the version
+ // of this previous relation
+ final DeweyNumber currentVersion = extractionState.getVersion();
+ if (currentVersion.isCompatibleWith(edge.getDeweyNumber())) {
+ final NodeId target = edge.getTarget();
+ Stack<Tuple2<NodeId, SharedBufferNode>> newPath;
+
+ if (firstMatch) {
+ // for the first match we don't have to copy the current path
+ newPath = currentPath;
+ firstMatch = false;
+ } else {
+ newPath = new Stack<>();
+ newPath.addAll(currentPath);
+ }
+
+ extractionStates.push(new ExtractionState(
+ target != null ? Tuple2.of(target, pages.get(target).getElement()) : null,
+ edge.getDeweyNumber(),
+ newPath));
+ }
+ }
+ }
+
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Increases the reference counter for the given entry so that it is not
+ * accidentally removed.
+ *
+ * @param node id of the entry
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public void lockNode(final NodeId node) throws Exception {
+ Lockable<SharedBufferNode> sharedBufferNode = pages.get(node);
+ if (sharedBufferNode != null) {
+ sharedBufferNode.lock();
+ pages.put(node, sharedBufferNode);
+ }
+ }
+
+ /**
+ * Decreases the reference counter for the given entry so that it can be
+ * removed once the reference counter reaches 0.
+ *
+ * @param node id of the entry
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public void releaseNode(final NodeId node) throws Exception {
+ Lockable<SharedBufferNode> sharedBufferNode = pages.get(node);
+ if (sharedBufferNode != null) {
+ if (sharedBufferNode.release()) {
+ removeNode(node, sharedBufferNode.getElement());
+ } else {
+ pages.put(node, sharedBufferNode);
+ }
+ }
+ }
+
+ private void removeNode(NodeId node, SharedBufferNode sharedBufferNode) throws Exception {
+ pages.remove(node);
+ EventId eventId = node.getEventId();
+ releaseEvent(eventId);
+
+ for (SharedBufferEdge sharedBufferEdge : sharedBufferNode.getEdges()) {
+ releaseNode(sharedBufferEdge.getTarget());
+ }
+ }
+
+ private void lockEvent(EventId eventId) throws Exception {
+ Lockable<V> eventWrapper = eventsBuffer.get(eventId);
+ checkState(
+ eventWrapper != null,
+ "Referring to non existent event with id %s",
+ eventId);
+ eventWrapper.lock();
+ eventsBuffer.put(eventId, eventWrapper);
+ }
+
+ /**
+ * Decreases the reference counter for the given event so that it can be
+ * removed once the reference counter reaches 0.
+ *
+ * @param eventId id of the event
+ * @throws Exception Thrown if the system cannot access the state.
+ */
+ public void releaseEvent(EventId eventId) throws Exception {
+ Lockable<V> eventWrapper = eventsBuffer.get(eventId);
+ if (eventWrapper != null) {
+ if (eventWrapper.release()) {
+ eventsBuffer.remove(eventId);
+ } else {
+ eventsBuffer.put(eventId, eventWrapper);
+ }
+ }
+ }
+
+ /**
+ * Helper class to store the extraction state while extracting a sequence of values following
+ * the versioned entry edges.
+ */
+ private static class ExtractionState {
+
+ private final Tuple2<NodeId, SharedBufferNode> entry;
+ private final DeweyNumber version;
+ private final Stack<Tuple2<NodeId, SharedBufferNode>> path;
+
+ ExtractionState(
+ final Tuple2<NodeId, SharedBufferNode> entry,
+ final DeweyNumber version,
+ final Stack<Tuple2<NodeId, SharedBufferNode>> path) {
+ this.entry = entry;
+ this.version = version;
+ this.path = path;
+ }
+
+ public Tuple2<NodeId, SharedBufferNode> getEntry() {
+ return entry;
+ }
+
+ public Stack<Tuple2<NodeId, SharedBufferNode>> getPath() {
+ return path;
+ }
+
+ public DeweyNumber getVersion() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return "ExtractionState(" + entry + ", " + version + ", [" +
+ StringUtils.join(path, ", ") + "])";
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
new file mode 100644
index 0000000..c8d9021
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferEdge.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.cep.nfa.DeweyNumber;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+
+/**
+ * Versioned edge in {@link SharedBuffer} that allows retrieving predecessors.
+ */
+public class SharedBufferEdge {
+
+ private final NodeId target;
+ private final DeweyNumber deweyNumber;
+
+ /**
+ * Creates versioned (with {@link DeweyNumber}) edge that points to the target entry.
+ *
+ * @param target id of target entry
+ * @param deweyNumber version for this edge
+ */
+ public SharedBufferEdge(NodeId target, DeweyNumber deweyNumber) {
+ this.target = target;
+ this.deweyNumber = deweyNumber;
+ }
+
+ NodeId getTarget() {
+ return target;
+ }
+
+ DeweyNumber getDeweyNumber() {
+ return deweyNumber;
+ }
+
+ @Override
+ public String toString() {
+ return "SharedBufferEdge{" +
+ "target=" + target +
+ ", deweyNumber=" + deweyNumber +
+ '}';
+ }
+
+ /** Serializer for {@link SharedBufferEdge}. */
+ public static class SharedBufferEdgeSerializer extends TypeSerializerSingleton<SharedBufferEdge> {
+
+ private static final long serialVersionUID = -5122474955050663979L;
+
+ static final SharedBufferEdgeSerializer INSTANCE = new SharedBufferEdgeSerializer();
+
+ private SharedBufferEdgeSerializer() {}
+
+ @Override
+ public boolean isImmutableType() {
+ return true;
+ }
+
+ @Override
+ public SharedBufferEdge createInstance() {
+ return null;
+ }
+
+ @Override
+ public SharedBufferEdge copy(SharedBufferEdge from) {
+ return new SharedBufferEdge(from.target, from.deweyNumber);
+ }
+
+ @Override
+ public SharedBufferEdge copy(SharedBufferEdge from, SharedBufferEdge reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(SharedBufferEdge record, DataOutputView target) throws IOException {
+ NodeId.NodeIdSerializer.INSTANCE.serialize(record.target, target);
+ DeweyNumber.DeweyNumberSerializer.INSTANCE.serialize(record.deweyNumber, target);
+ }
+
+ @Override
+ public SharedBufferEdge deserialize(DataInputView source) throws IOException {
+ NodeId target = NodeId.NodeIdSerializer.INSTANCE.deserialize(source);
+ DeweyNumber deweyNumber = DeweyNumber.DeweyNumberSerializer.INSTANCE.deserialize(source);
+ return new SharedBufferEdge(target, deweyNumber);
+ }
+
+ @Override
+ public SharedBufferEdge deserialize(SharedBufferEdge reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ NodeId.NodeIdSerializer.INSTANCE.copy(source, target);
+ DeweyNumber.DeweyNumberSerializer.INSTANCE.copy(source, target);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj.getClass().equals(SharedBufferEdgeSerializer.class);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/9218df82/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
new file mode 100644
index 0000000..b613625
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/sharedbuffer/SharedBufferNode.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.sharedbuffer;
+
+import org.apache.flink.api.common.typeutils.base.ListSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
+import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferEdge.SharedBufferEdgeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * An entry in {@link SharedBuffer} that allows to store relations between different entries.
+ */
+public class SharedBufferNode {
+
+ private final List<SharedBufferEdge> edges;
+
+ public SharedBufferNode() {
+ edges = new ArrayList<>();
+ }
+
+ private SharedBufferNode(List<SharedBufferEdge> edges) {
+ this.edges = edges;
+ }
+
+ public List<SharedBufferEdge> getEdges() {
+ return edges;
+ }
+
+ public void addEdge(SharedBufferEdge edge) {
+ edges.add(edge);
+ }
+
+ @Override
+ public String toString() {
+ return "SharedBufferNode{" +
+ "edges=" + edges +
+ '}';
+ }
+
+ /** Serializer for {@link SharedBufferNode}. */
+ public static class SharedBufferNodeSerializer extends TypeSerializerSingleton<SharedBufferNode> {
+
+ private static final long serialVersionUID = -6687780732295439832L;
+
+ private final ListSerializer<SharedBufferEdge> edgesSerializer =
+ new ListSerializer<>(SharedBufferEdgeSerializer.INSTANCE);
+
+ @Override
+ public boolean isImmutableType() {
+ return false;
+ }
+
+ @Override
+ public SharedBufferNode createInstance() {
+ return new SharedBufferNode(new ArrayList<>());
+ }
+
+ @Override
+ public SharedBufferNode copy(SharedBufferNode from) {
+ return new SharedBufferNode(edgesSerializer.copy(from.edges));
+ }
+
+ @Override
+ public SharedBufferNode copy(SharedBufferNode from, SharedBufferNode reuse) {
+ return copy(from);
+ }
+
+ @Override
+ public int getLength() {
+ return -1;
+ }
+
+ @Override
+ public void serialize(SharedBufferNode record, DataOutputView target) throws IOException {
+ edgesSerializer.serialize(record.edges, target);
+ }
+
+ @Override
+ public SharedBufferNode deserialize(DataInputView source) throws IOException {
+ List<SharedBufferEdge> edges = edgesSerializer.deserialize(source);
+ return new SharedBufferNode(edges);
+ }
+
+ @Override
+ public SharedBufferNode deserialize(SharedBufferNode reuse, DataInputView source) throws IOException {
+ return deserialize(source);
+ }
+
+ @Override
+ public void copy(DataInputView source, DataOutputView target) throws IOException {
+ edgesSerializer.copy(source, target);
+ }
+
+ @Override
+ public boolean canEqual(Object obj) {
+ return obj.getClass().equals(SharedBufferNodeSerializer.class);
+ }
+ }
+}