You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2017/06/08 13:45:46 UTC

flink git commit: [FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.

Repository: flink
Updated Branches:
  refs/heads/master bcaf816dc -> 5d3506e88


[FLINK-6772] [cep] Fix ordering (by timestamp) of matched events.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d3506e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d3506e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d3506e8

Branch: refs/heads/master
Commit: 5d3506e88f24ec0d1c2272f04570c745d319329b
Parents: bcaf816
Author: kkloudas <kk...@gmail.com>
Authored: Wed May 31 17:48:34 2017 +0200
Committer: kkloudas <kk...@gmail.com>
Committed: Thu Jun 8 15:41:38 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java | 88 +++++++++++---------
 .../org/apache/flink/cep/nfa/SharedBuffer.java  | 26 +++---
 .../flink/cep/nfa/compiler/NFACompiler.java     | 45 ++--------
 .../cep/nfa/compiler/NFAStateNameHandler.java   | 79 ++++++++++++++++++
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 64 ++++++++++++++
 .../apache/flink/cep/nfa/SharedBufferTest.java  | 73 +++++++++-------
 .../flink/cep/nfa/compiler/NFACompilerTest.java |  2 +-
 7 files changed, 255 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index f438915..cac1601 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.nfa.compiler.NFAStateNameHandler;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
@@ -43,7 +44,6 @@ import org.apache.flink.util.Preconditions;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
-import com.google.common.collect.ListMultimap;
 
 import javax.annotation.Nullable;
 
@@ -231,7 +231,7 @@ public class NFA<T> implements Serializable {
 				}
 
 				eventSharedBuffer.release(
-						computationState.getPreviousState().getName(),
+						NFAStateNameHandler.getOriginalNameFromInternal(computationState.getPreviousState().getName()),
 						computationState.getEvent(),
 						computationState.getTimestamp(),
 						computationState.getCounter());
@@ -248,6 +248,7 @@ public class NFA<T> implements Serializable {
 			//if stop state reached in this path
 			boolean shouldDiscardPath = false;
 			for (final ComputationState<T> newComputationState: newComputationStates) {
+
 				if (newComputationState.isFinalState()) {
 					// we've reached a final state and can thus retrieve the matching event sequence
 					Map<String, List<T>> matchedPattern = extractCurrentMatches(newComputationState);
@@ -255,7 +256,8 @@ public class NFA<T> implements Serializable {
 
 					// remove found patterns because they are no longer needed
 					eventSharedBuffer.release(
-							newComputationState.getPreviousState().getName(),
+							NFAStateNameHandler.getOriginalNameFromInternal(
+									newComputationState.getPreviousState().getName()),
 							newComputationState.getEvent(),
 							newComputationState.getTimestamp(),
 							computationState.getCounter());
@@ -263,10 +265,11 @@ public class NFA<T> implements Serializable {
 					//reached stop state. release entry for the stop state
 					shouldDiscardPath = true;
 					eventSharedBuffer.release(
-						newComputationState.getPreviousState().getName(),
-						newComputationState.getEvent(),
-						newComputationState.getTimestamp(),
-						computationState.getCounter());
+							NFAStateNameHandler.getOriginalNameFromInternal(
+									newComputationState.getPreviousState().getName()),
+							newComputationState.getEvent(),
+							newComputationState.getTimestamp(),
+							computationState.getCounter());
 				} else {
 					// add new computation state; it will be processed once the next event arrives
 					statesToRetain.add(newComputationState);
@@ -278,10 +281,11 @@ public class NFA<T> implements Serializable {
 				// the buffer
 				for (final ComputationState<T> state : statesToRetain) {
 					eventSharedBuffer.release(
-						state.getPreviousState().getName(),
-						state.getEvent(),
-						state.getTimestamp(),
-						state.getCounter());
+							NFAStateNameHandler.getOriginalNameFromInternal(
+									state.getPreviousState().getName()),
+							state.getEvent(),
+							state.getTimestamp(),
+							state.getCounter());
 				}
 			} else {
 				computationStates.addAll(statesToRetain);
@@ -473,17 +477,20 @@ public class NFA<T> implements Serializable {
 					if (computationState.isStartState()) {
 						startTimestamp = timestamp;
 						counter = eventSharedBuffer.put(
-							currentState.getName(),
+							NFAStateNameHandler.getOriginalNameFromInternal(
+									currentState.getName()),
 							event,
 							timestamp,
 							currentVersion);
 					} else {
 						startTimestamp = computationState.getStartTimestamp();
 						counter = eventSharedBuffer.put(
-							currentState.getName(),
+							NFAStateNameHandler.getOriginalNameFromInternal(
+									currentState.getName()),
 							event,
 							timestamp,
-							previousState.getName(),
+							NFAStateNameHandler.getOriginalNameFromInternal(
+									previousState.getName()),
 							previousEvent,
 							computationState.getTimestamp(),
 							computationState.getCounter(),
@@ -530,10 +537,11 @@ public class NFA<T> implements Serializable {
 		if (computationState.getEvent() != null) {
 			// release the shared entry referenced by the current computation state.
 			eventSharedBuffer.release(
-				computationState.getPreviousState().getName(),
-				computationState.getEvent(),
-				computationState.getTimestamp(),
-				computationState.getCounter());
+					NFAStateNameHandler.getOriginalNameFromInternal(
+							computationState.getPreviousState().getName()),
+					computationState.getEvent(),
+					computationState.getTimestamp(),
+					computationState.getCounter());
 		}
 
 		return resultingComputationStates;
@@ -551,7 +559,9 @@ public class NFA<T> implements Serializable {
 		ComputationState<T> computationState = ComputationState.createState(
 				this, currentState, previousState, event, counter, timestamp, version, startTimestamp);
 		computationStates.add(computationState);
-		eventSharedBuffer.lock(previousState.getName(), event, timestamp, counter);
+
+		String originalStateName = NFAStateNameHandler.getOriginalNameFromInternal(previousState.getName());
+		eventSharedBuffer.lock(originalStateName, event, timestamp, counter);
 	}
 
 	private State<T> findFinalStateAfterProceed(State<T> state, T event, ComputationState<T> computationState) {
@@ -641,32 +651,34 @@ public class NFA<T> implements Serializable {
 			eventSerializer = nonDuplicatingTypeSerializer.getTypeSerializer();
 		}
 
-		Collection<ListMultimap<String, T>> paths = eventSharedBuffer.extractPatterns(
-				computationState.getPreviousState().getName(),
+		List<Map<String, List<T>>> paths = eventSharedBuffer.extractPatterns(
+				NFAStateNameHandler.getOriginalNameFromInternal(
+						computationState.getPreviousState().getName()),
 				computationState.getEvent(),
 				computationState.getTimestamp(),
 				computationState.getCounter(),
 				computationState.getVersion());
 
+		if (paths.isEmpty()) {
+			return new HashMap<>();
+		}
 		// for a given computation state, we cannot have more than one matching patterns.
-		Preconditions.checkState(paths.size() <= 1);
+		Preconditions.checkState(paths.size() == 1);
 
 		Map<String, List<T>> result = new HashMap<>();
-		for (ListMultimap<String, T> path: paths) {
-			for (String key: path.keySet()) {
-				List<T> events = path.get(key);
-
-				String originalKey = NFACompiler.getOriginalStateNameFromInternal(key);
-				List<T> values = result.get(originalKey);
-				if (values == null) {
-					values = new ArrayList<>(events.size());
-				}
+		Map<String, List<T>> path = paths.get(0);
+		for (String key: path.keySet()) {
+			List<T> events = path.get(key);
+
+			List<T> values = result.get(key);
+			if (values == null) {
+				values = new ArrayList<>(events.size());
+				result.put(key, values);
+			}
 
-				for (T event: events) {
-					// copy the element so that the user can change it
-					values.add(eventSerializer.isImmutableType() ? event : eventSerializer.copy(event));
-				}
-				result.put(originalKey, values);
+			for (T event: events) {
+				// copy the element so that the user can change it
+				values.add(eventSerializer.isImmutableType() ? event : eventSerializer.copy(event));
 			}
 		}
 		return result;
@@ -871,10 +883,6 @@ public class NFA<T> implements Serializable {
 			return null;
 		}
 
-		private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-			ois.defaultReadObject();
-		}
-
 		@Override
 		public NFA<T> copy(NFA<T> from) {
 			try {

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index a44b333..d592c65 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -11,7 +11,7 @@
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
- * WIVHOUV WARRANVIES OR CONDIVIONS OF ANY KIND, either express or implied.
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
@@ -34,8 +34,6 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
 import org.apache.commons.lang3.StringUtils;
 
 import java.io.ByteArrayInputStream;
@@ -217,14 +215,14 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 	 * @param version Version of the previous relation which shall be extracted
 	 * @return Collection of previous relations starting with the given value
 	 */
-	public Collection<ListMultimap<K, V>> extractPatterns(
+	public List<Map<K, List<V>>> extractPatterns(
 			final K key,
 			final V value,
 			final long timestamp,
 			final int counter,
 			final DeweyNumber version) {
 
-		Collection<ListMultimap<K, V>> result = new ArrayList<>();
+		List<Map<K, List<V>>> result = new ArrayList<>();
 
 		// stack to remember the current extraction states
 		Stack<ExtractionState<K, V>> extractionStates = new Stack<>();
@@ -244,12 +242,18 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 				// termination criterion
 				if (currentEntry == null) {
-					final ListMultimap<K, V> completePath = ArrayListMultimap.create();
+					final Map<K, List<V>> completePath = new HashMap<>();
 
 					while (!currentPath.isEmpty()) {
 						final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
 
-						completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue());
+						K k = currentPathEntry.getKey();
+						List<V> values = completePath.get(k);
+						if (values == null) {
+							values = new ArrayList<>();
+							completePath.put(k, values);
+						}
+						values.add(currentPathEntry.getValueTime().getValue());
 					}
 
 					result.add(completePath);
@@ -777,12 +781,6 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 		ExtractionState(
 				final SharedBufferEntry<K, V> entry,
-				final DeweyNumber version) {
-			this(entry, version, null);
-		}
-
-		ExtractionState(
-				final SharedBufferEntry<K, V> entry,
 				final DeweyNumber version,
 				final Stack<SharedBufferEntry<K, V>> path) {
 			this.entry = entry;
@@ -942,7 +940,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 					ValueTimeWrapper<V> valueTimeWrapper = sharedBuffer.getValueTime();
 
-					valueSerializer.serialize(valueTimeWrapper.value, target);
+					valueSerializer.serialize(valueTimeWrapper.getValue(), target);
 					target.writeLong(valueTimeWrapper.getTimestamp());
 					target.writeInt(valueTimeWrapper.getCounter());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 8d1d366..ce42acd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -32,7 +32,6 @@ import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
-import org.apache.flink.util.Preconditions;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
@@ -44,10 +43,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 /**
  * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
@@ -57,8 +54,6 @@ public class NFACompiler {
 
 	protected static final String ENDING_STATE_NAME = "$endState$";
 
-	protected static final String STATE_NAME_DELIM = ":";
-
 	/**
 	 * Compiles the given pattern into a {@link NFA}.
 	 *
@@ -77,11 +72,6 @@ public class NFACompiler {
 		return factory.createNFA();
 	}
 
-	public static String getOriginalStateNameFromInternal(String internalName) {
-		Preconditions.checkNotNull(internalName);
-		return internalName.split(STATE_NAME_DELIM)[0];
-	}
-
 	/**
 	 * Compiles the given pattern into a {@link NFAFactory}. The NFA factory can be used to create
 	 * multiple NFAs.
@@ -115,7 +105,7 @@ public class NFACompiler {
 	 */
 	static class NFAFactoryCompiler<T> {
 
-		private final Set<String> usedNames = new HashSet<>();
+		private final NFAStateNameHandler stateNameHandler = new NFAStateNameHandler();
 		private final Map<String, State<T>> stopStates = new HashMap<>();
 		private final List<State<T>> states = new ArrayList<>();
 
@@ -207,7 +197,8 @@ public class NFACompiler {
 				if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_FOLLOW) {
 					//skip notFollow patterns, they are converted into edge conditions
 				} else if (currentPattern.getQuantifier().getConsumingStrategy() == Quantifier.ConsumingStrategy.NOT_NEXT) {
-					checkPatternNameUniqueness(currentPattern.getName());
+					stateNameHandler.checkNameUniqueness(currentPattern.getName());
+
 					final State<T> notNext = createState(currentPattern.getName(), State.StateType.Normal);
 					final IterativeCondition<T> notCondition = (IterativeCondition<T>) currentPattern.getCondition();
 					final State<T> stopState = createStopState(notCondition, currentPattern.getName());
@@ -221,7 +212,7 @@ public class NFACompiler {
 					notNext.addProceed(stopState, notCondition);
 					lastSink = notNext;
 				} else {
-					checkPatternNameUniqueness(currentPattern.getName());
+					stateNameHandler.checkNameUniqueness(currentPattern.getName());
 					lastSink = convertPattern(lastSink);
 				}
 
@@ -246,7 +237,7 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createStartState(State<T> sinkState) {
-			checkPatternNameUniqueness(currentPattern.getName());
+			stateNameHandler.checkNameUniqueness(currentPattern.getName());
 			final State<T> beginningState = convertPattern(sinkState);
 			beginningState.makeStart();
 			return beginningState;
@@ -284,36 +275,12 @@ public class NFACompiler {
 		 * @return the created state
 		 */
 		private State<T> createState(String name, State.StateType stateType) {
-			String stateName = getUniqueInternalStateName(name);
-			usedNames.add(stateName);
+			String stateName = stateNameHandler.getUniqueInternalName(name);
 			State<T> state = new State<>(stateName, stateType);
 			states.add(state);
 			return state;
 		}
 
-		/**
-		 * Used to give a unique name to states created
-		 * during the translation process.
-		 *
-		 * @param baseName The base of the name.
-		 */
-		private String getUniqueInternalStateName(String baseName) {
-			int counter = 0;
-			String candidate = baseName;
-			while (usedNames.contains(candidate)) {
-				candidate = baseName + STATE_NAME_DELIM + counter++;
-			}
-			return candidate;
-		}
-
-		private void checkPatternNameUniqueness(String patternName) {
-			if (usedNames.contains(patternName)) {
-				throw new MalformedPatternException(
-						"Duplicate pattern name: " + patternName + ". " +
-								"Pattern names must be unique.");
-			}
-		}
-
 		private State<T> createStopState(final IterativeCondition<T> notCondition, final String name) {
 			// We should not duplicate the notStates. All states from which we can stop should point to the same one.
 			State<T> stopState = stopStates.get(name);

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
new file mode 100644
index 0000000..558b6f4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFAStateNameHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.compiler;
+
+import org.apache.flink.cep.pattern.MalformedPatternException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * A utility class used to handle name conventions and guarantee unique
+ * names for the states of our {@link org.apache.flink.cep.nfa.NFA}.
+ */
+public class NFAStateNameHandler {
+
+	private static final String STATE_NAME_DELIM = ":";
+
+	private final Set<String> usedNames = new HashSet<>();
+
+	/**
+	 * Implements the reverse process of the {@link #getUniqueInternalName(String)}.
+	 *
+	 * @param internalName The name to be decoded.
+	 * @return The original, user-specified name for the state.
+	 */
+	public static String getOriginalNameFromInternal(String internalName) {
+		Preconditions.checkNotNull(internalName);
+		return internalName.split(STATE_NAME_DELIM)[0];
+	}
+
+	/**
+	 * Checks if the given name is already used or not. If yes, it
+	 * throws a {@link MalformedPatternException}.
+	 *
+	 * @param name The name to be checked.
+	 */
+	public void checkNameUniqueness(String name) {
+		if (usedNames.contains(name)) {
+			throw new MalformedPatternException("Duplicate pattern name: " + name + ". Names must be unique.");
+		}
+	}
+
+	/**
+	 * Used to give a unique name to {@link org.apache.flink.cep.nfa.NFA} states
+	 * created during the translation process. The name format will be
+	 * {@code baseName:counter} , where the counter is increasing for states with
+	 * the same {@code baseName}.
+	 *
+	 * @param baseName The base of the name.
+	 * @return The (unique) name that is going to be used internally for the state.
+	 */
+	public String getUniqueInternalName(String baseName) {
+		int counter = 0;
+		String candidate = baseName;
+		while (usedNames.contains(candidate)) {
+			candidate = baseName + STATE_NAME_DELIM + counter++;
+		}
+		usedNames.add(candidate);
+		return candidate;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 92b49d3..20cb482 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
 
 import com.google.common.collect.Lists;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -2622,4 +2623,67 @@ public class NFAITCase extends TestLogger {
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end)
 			));
 	}
+
+	@Test
+	public void testNFAResultOrdering() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent1 = new Event(41, "a-1", 2.0);
+		Event startEvent2 = new Event(41, "a-2", 3.0);
+		Event startEvent3 = new Event(41, "a-3", 4.0);
+		Event startEvent4 = new Event(41, "a-4", 5.0);
+		Event endEvent1 = new Event(41, "b-1", 6.0);
+		Event endEvent2 = new Event(41, "b-2", 7.0);
+		Event endEvent3 = new Event(41, "b-3", 8.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1));
+		inputEvents.add(new StreamRecord<>(startEvent2, 3));
+		inputEvents.add(new StreamRecord<>(startEvent3, 4));
+		inputEvents.add(new StreamRecord<>(startEvent4, 5));
+		inputEvents.add(new StreamRecord<>(endEvent1, 6));
+		inputEvents.add(new StreamRecord<>(endEvent2, 7));
+		inputEvents.add(new StreamRecord<>(endEvent3, 10));
+
+		Pattern<Event, ?> pattern = Pattern
+				.<Event>begin("start")
+				.where(new SimpleCondition<Event>() {
+					private static final long serialVersionUID = 6452194090480345053L;
+
+					@Override
+					public boolean filter(Event s) throws Exception {
+						return s.getName().startsWith("a-");
+					}
+				}).times(4).allowCombinations()
+				.followedByAny("middle")
+				.where(new SimpleCondition<Event>() {
+					private static final long serialVersionUID = -6838398439317275390L;
+
+					public boolean filter(Event s) throws Exception {
+						return s.getName().startsWith("b-");
+					}
+				}).times(3).consecutive();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<Map<String, List<Event>>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, List<Event>>> patterns = nfa.process(
+					inputEvent.getValue(),
+					inputEvent.getTimestamp()).f0;
+
+			resultingPatterns.addAll(patterns);
+		}
+
+		Assert.assertEquals(1L, resultingPatterns.size());
+
+		Map<String, List<Event>> match = resultingPatterns.get(0);
+		Assert.assertArrayEquals(
+				match.get("start").toArray(),
+				Lists.newArrayList(startEvent1, startEvent2, startEvent3, startEvent4).toArray());
+
+		Assert.assertArrayEquals(
+				match.get("middle").toArray(),
+				Lists.newArrayList(endEvent1, endEvent2, endEvent3).toArray());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index 44033c1..3621bad 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -24,15 +24,17 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -54,28 +56,43 @@ public class SharedBufferTest extends TestLogger {
 			events[i] = new Event(i + 1, "e" + (i + 1), i);
 		}
 
-		ListMultimap<String, Event> expectedPattern1 = ArrayListMultimap.create();
-		expectedPattern1.put("a1", events[2]);
-		expectedPattern1.put("a[]", events[3]);
-		expectedPattern1.put("b", events[5]);
-
-		ListMultimap<String, Event> expectedPattern2 = ArrayListMultimap.create();
-		expectedPattern2.put("a1", events[0]);
-		expectedPattern2.put("a[]", events[1]);
-		expectedPattern2.put("a[]", events[2]);
-		expectedPattern2.put("a[]", events[3]);
-		expectedPattern2.put("a[]", events[4]);
-		expectedPattern2.put("b", events[5]);
-
-		ListMultimap<String, Event> expectedPattern3 = ArrayListMultimap.create();
-		expectedPattern3.put("a1", events[0]);
-		expectedPattern3.put("a[]", events[1]);
-		expectedPattern3.put("a[]", events[2]);
-		expectedPattern3.put("a[]", events[3]);
-		expectedPattern3.put("a[]", events[4]);
-		expectedPattern3.put("a[]", events[5]);
-		expectedPattern3.put("a[]", events[6]);
-		expectedPattern3.put("b", events[7]);
+		Map<String, List<Event>> expectedPattern1 = new HashMap<>();
+		expectedPattern1.put("a1", new ArrayList<Event>());
+		expectedPattern1.get("a1").add(events[2]);
+
+		expectedPattern1.put("a[]", new ArrayList<Event>());
+		expectedPattern1.get("a[]").add(events[3]);
+
+		expectedPattern1.put("b", new ArrayList<Event>());
+		expectedPattern1.get("b").add(events[5]);
+
+		Map<String, List<Event>> expectedPattern2 = new HashMap<>();
+		expectedPattern2.put("a1", new ArrayList<Event>());
+		expectedPattern2.get("a1").add(events[0]);
+
+		expectedPattern2.put("a[]", new ArrayList<Event>());
+		expectedPattern2.get("a[]").add(events[1]);
+		expectedPattern2.get("a[]").add(events[2]);
+		expectedPattern2.get("a[]").add(events[3]);
+		expectedPattern2.get("a[]").add(events[4]);
+
+		expectedPattern2.put("b", new ArrayList<Event>());
+		expectedPattern2.get("b").add(events[5]);
+
+		Map<String, List<Event>> expectedPattern3 = new HashMap<>();
+		expectedPattern3.put("a1", new ArrayList<Event>());
+		expectedPattern3.get("a1").add(events[0]);
+
+		expectedPattern3.put("a[]", new ArrayList<Event>());
+		expectedPattern3.get("a[]").add(events[1]);
+		expectedPattern3.get("a[]").add(events[2]);
+		expectedPattern3.get("a[]").add(events[3]);
+		expectedPattern3.get("a[]").add(events[4]);
+		expectedPattern3.get("a[]").add(events[5]);
+		expectedPattern3.get("a[]").add(events[6]);
+
+		expectedPattern3.put("b", new ArrayList<Event>());
+		expectedPattern3.get("b").add(events[7]);
 
 		sharedBuffer.put("a1", events[0], timestamp, null, null, 0, 0, DeweyNumber.fromString("1"));
 		sharedBuffer.put("a[]", events[1], timestamp, "a1", events[0], timestamp, 0, DeweyNumber.fromString("1.0"));
@@ -90,12 +107,12 @@ public class SharedBufferTest extends TestLogger {
 		sharedBuffer.put("a[]", events[6], timestamp, "a[]", events[5], timestamp, 5, DeweyNumber.fromString("1.1"));
 		sharedBuffer.put("b", events[7], timestamp, "a[]", events[6], timestamp, 6, DeweyNumber.fromString("1.1.0"));
 
-		Collection<ListMultimap<String, Event>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
+		Collection<Map<String, List<Event>>> patterns3 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
 		sharedBuffer.release("b", events[7], timestamp, 7);
-		Collection<ListMultimap<String, Event>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
+		Collection<Map<String, List<Event>>> patterns4 = sharedBuffer.extractPatterns("b", events[7], timestamp, 7, DeweyNumber.fromString("1.1.0"));
 
-		Collection<ListMultimap<String, Event>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, 2, DeweyNumber.fromString("2.0.0"));
-		Collection<ListMultimap<String, Event>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, 5, DeweyNumber.fromString("1.0.0"));
+		Collection<Map<String, List<Event>>> patterns1 = sharedBuffer.extractPatterns("b", events[5], timestamp, 2, DeweyNumber.fromString("2.0.0"));
+		Collection<Map<String, List<Event>>> patterns2 = sharedBuffer.extractPatterns("b", events[5], timestamp, 5, DeweyNumber.fromString("1.0.0"));
 		sharedBuffer.release("b", events[5], timestamp, 2);
 		sharedBuffer.release("b", events[5], timestamp, 5);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5d3506e8/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index cd12071..6d4329a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -80,7 +80,7 @@ public class NFACompilerTest extends TestLogger {
 
 		// adjust the rule
 		expectedException.expect(MalformedPatternException.class);
-		expectedException.expectMessage("Duplicate pattern name: start. Pattern names must be unique.");
+		expectedException.expectMessage("Duplicate pattern name: start. Names must be unique.");
 
 		Pattern<Event, ?> invalidPattern = Pattern.<Event>begin("start").where(new TestFilter())
 			.followedBy("middle").where(new TestFilter())