You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/28 08:51:57 UTC

[1/3] flink git commit: [FLINK-6137] Activate strict checkstyle for flink-cep

Repository: flink
Updated Branches:
  refs/heads/master 4f50dc4df -> c9e574bf3


http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
new file mode 100644
index 0000000..7bf0767
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATestUtilities.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.primitives.Doubles;
+import org.junit.Assert;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base method for IT tests of {@link NFA}. It provides utility methods.
+ */
+public class NFATestUtilities {
+
+	public static List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
+		List<List<Event>> resultingPatterns = new ArrayList<>();
+
+		for (StreamRecord<Event> inputEvent : inputEvents) {
+			Collection<Map<String, List<Event>>> patterns = nfa.process(
+				inputEvent.getValue(),
+				inputEvent.getTimestamp()).f0;
+
+			for (Map<String, List<Event>> p: patterns) {
+				List<Event> res = new ArrayList<>();
+				for (List<Event> le: p.values()) {
+					res.addAll(le);
+				}
+				resultingPatterns.add(res);
+			}
+		}
+		return resultingPatterns;
+	}
+
+	public static void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
+		Assert.assertEquals(expected.size(), actual.size());
+
+		for (List<Event> p: actual) {
+			Collections.sort(p, new EventComparator());
+		}
+
+		for (List<Event> p: expected) {
+			Collections.sort(p, new EventComparator());
+		}
+
+		Collections.sort(actual, new ListEventComparator());
+		Collections.sort(expected, new ListEventComparator());
+		Assert.assertArrayEquals(expected.toArray(), actual.toArray());
+	}
+
+	private static class ListEventComparator implements Comparator<List<Event>> {
+
+		@Override
+		public int compare(List<Event> o1, List<Event> o2) {
+			int sizeComp = Integer.compare(o1.size(), o2.size());
+			if (sizeComp == 0) {
+				EventComparator comp = new EventComparator();
+				for (int i = 0; i < o1.size(); i++) {
+					int eventComp = comp.compare(o1.get(i), o2.get(i));
+					if (eventComp != 0) {
+						return eventComp;
+					}
+				}
+				return 0;
+			} else {
+				return sizeComp;
+			}
+		}
+	}
+
+	private static class EventComparator implements Comparator<Event> {
+
+		@Override
+		public int compare(Event o1, Event o2) {
+			int nameComp = o1.getName().compareTo(o2.getName());
+			int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice());
+			int idComp = Integer.compare(o1.getId(), o2.getId());
+			if (nameComp == 0) {
+				if (priceComp == 0) {
+					return idComp;
+				} else {
+					return priceComp;
+				}
+			} else {
+				return nameComp;
+			}
+		}
+	}
+
+	private NFATestUtilities() {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
new file mode 100644
index 0000000..3b95eb4
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NotPatternITCase.java
@@ -0,0 +1,1036 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link Pattern#notFollowedBy(String)} and {@link Pattern#notNext(String)}.
+ */
+@SuppressWarnings("unchecked")
+public class NotPatternITCase extends TestLogger {
+
+	@Test
+	public void testNotNext() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5167288560432018992L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2242479288129905510L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 1404509325548220892L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -8907427230007830915L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d),
+			Lists.newArrayList(a1, c2, d)
+		));
+	}
+
+	@Test
+	public void testNotNextNoMatches() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -339500190577666439L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -6913980632538046451L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3332196998905139891L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2086563479959018387L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotNextNoMatchesAtTheEnd() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+		Event b1 = new Event(42, "b", 3.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(c2, 3));
+		inputEvents.add(new StreamRecord<>(d, 4));
+		inputEvents.add(new StreamRecord<>(b1, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 1672995058886176627L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6003621617520261554L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 887700237024758417L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		}).notNext("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5239529076086933032L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -2641662468313191976L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -3632144132379494778L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3818766882138348167L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2033204730795451288L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptional() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -2454396370205097543L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2749547391611263290L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -4989511337298217255L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -8466223836652936608L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, d)
+		));
+	}
+
+	@Test
+	public void testTimesWithNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(41, "b", 2.0);
+		Event c = new Event(42, "c", 3.0);
+		Event b2 = new Event(43, "b", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c, 3));
+		inputEvents.add(new StreamRecord<>(b2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -2568839911852184515L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -3632232424064269636L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3685596793523534611L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 1960758663575587243L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList());
+	}
+
+	@Test
+	public void testIgnoreStateOfTimesWithNotFollowedBy() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event e = new Event(41, "e", 2.0);
+		Event c1 = new Event(42, "c", 3.0);
+		Event b1 = new Event(43, "b", 4.0);
+		Event c2 = new Event(44, "c", 5.0);
+		Event d1 = new Event(45, "d", 6.0);
+		Event d2 = new Event(46, "d", 7.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(d1, 2));
+		inputEvents.add(new StreamRecord<>(e, 1));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d2, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2814850350025111940L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4988756153568853834L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -225909103322018778L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -924294627956373696L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, d1)
+		));
+	}
+
+	@Test
+	public void testTimesWithNotFollowedByAfter() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event e = new Event(41, "e", 2.0);
+		Event c1 = new Event(42, "c", 3.0);
+		Event b1 = new Event(43, "b", 4.0);
+		Event b2 = new Event(44, "b", 5.0);
+		Event d1 = new Event(46, "d", 7.0);
+		Event d2 = new Event(47, "d", 8.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(d1, 2));
+		inputEvents.add(new StreamRecord<>(e, 1));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(b2, 3));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(d2, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6193105689601702341L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5195859580923169111L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4973027956103783831L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 2724622546678984894L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList());
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptionalAtTheEnd() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -4289351792573443294L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -4989574608417523507L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -5940131818629290579L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).optional();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1),
+			Lists.newArrayList(a1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOptionalTimes() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c2 = new Event(43, "c", 4.0);
+		Event d = new Event(43, "d", 4.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(c1, 2));
+		inputEvents.add(new StreamRecord<>(b1, 3));
+		inputEvents.add(new StreamRecord<>(c2, 4));
+		inputEvents.add(new StreamRecord<>(d, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -7885381452276160322L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 3471511260235826653L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 9073793782452363833L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 7972902718259767076L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a1, c1, c2, d)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByWithBranchingAtStart() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a1 = new Event(40, "a", 1.0);
+		Event b1 = new Event(42, "b", 3.0);
+		Event c1 = new Event(41, "c", 2.0);
+		Event a2 = new Event(41, "a", 4.0);
+		Event c2 = new Event(43, "c", 5.0);
+		Event d = new Event(43, "d", 6.0);
+
+		inputEvents.add(new StreamRecord<>(a1, 1));
+		inputEvents.add(new StreamRecord<>(b1, 2));
+		inputEvents.add(new StreamRecord<>(c1, 3));
+		inputEvents.add(new StreamRecord<>(a2, 4));
+		inputEvents.add(new StreamRecord<>(c2, 5));
+		inputEvents.add(new StreamRecord<>(d, 6));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -7866220136345465444L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 4957837489028234932L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5569569968862808007L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = -8579678167937416269L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("d");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a2, c2, d)
+		));
+	}
+
+	private static class NotFollowByData {
+		static final Event A_1 = new Event(40, "a", 1.0);
+		static final Event B_1 = new Event(41, "b", 2.0);
+		static final Event B_2 = new Event(42, "b", 3.0);
+		static final Event B_3 = new Event(42, "b", 4.0);
+		static final Event C_1 = new Event(43, "c", 5.0);
+		static final Event B_4 = new Event(42, "b", 6.0);
+		static final Event B_5 = new Event(42, "b", 7.0);
+		static final Event B_6 = new Event(42, "b", 8.0);
+		static final Event D_1 = new Event(43, "d", 9.0);
+
+		private NotFollowByData() {
+		}
+	}
+
+	@Test
+	public void testNotNextAfterOneOrMoreSkipTillNext() {
+		final List<List<Event>> matches = testNotNextAfterOneOrMore(false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotNextAfterOneOrMoreSkipTillAny() {
+		final List<List<Event>> matches = testNotNextAfterOneOrMore(true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_2, NotFollowByData.D_1)
+		));
+	}
+
+	private List<List<Event>> testNotNextAfterOneOrMore(boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_2, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i++));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).oneOrMore()
+			.notNext("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			})
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByNextAfterOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedByAnyAfterOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_6, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByNextAfterOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, false);
+		assertEquals(0, matches.size());
+	}
+
+	@Test
+	public void testNotFollowedByAnyAfterOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_6, NotFollowByData.D_1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByAfterOneOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_2, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_3, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			});
+
+		pattern = (eager ? pattern.oneOrMore() : pattern.oneOrMore().allowCombinations())
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			})
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByAnyBeforeOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, true);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByAnyBeforeOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, true);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOneOrMoreEager() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, false);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeOneOrMoreCombinations() {
+		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, false);
+
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByBeforeOneOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			})
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			}).oneOrMore();
+
+		pattern = (eager ? pattern : pattern.allowCombinations())
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillNext() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, false);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillNext() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, false);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillAny() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1)
+		));
+	}
+
+	@Test
+	public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillAny() {
+		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, true);
+		compareMaps(matches, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_4, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.B_6, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_5, NotFollowByData.D_1),
+			Lists.newArrayList(NotFollowByData.A_1, NotFollowByData.B_1, NotFollowByData.B_6, NotFollowByData.D_1)
+		));
+	}
+
+	private List<List<Event>> testNotFollowedByBeforeZeroOrMore(boolean eager, boolean allMatches) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		int i = 0;
+		inputEvents.add(new StreamRecord<>(NotFollowByData.A_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.C_1, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_4, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_5, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.B_6, i++));
+		inputEvents.add(new StreamRecord<>(NotFollowByData.D_1, i));
+
+		Pattern<Event, ?> pattern = Pattern
+			.<Event>begin("a").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("a");
+				}
+			})
+			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("c");
+				}
+			});
+
+		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
+			.where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("b");
+				}
+			}).oneOrMore().optional();
+
+		pattern = (eager ? pattern : pattern.allowCombinations())
+			.followedBy("d").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 5726188262756267490L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("d");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
new file mode 100644
index 0000000..d378a74
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SameElementITCase.java
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * Tests for handling Events that are equal in case of {@link Object#equals(Object)} and have same timestamps.
+ */
+@SuppressWarnings("unchecked")
+public class SameElementITCase extends TestLogger {
+
+	@Test
+	public void testEagerZeroOrMoreSameElement() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(end1, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1, end1),
+			Lists.newArrayList(startEvent, end1)
+		));
+	}
+
+	@Test
+	public void testZeroOrMoreSameElement() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent1a = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event middleEvent3a = new Event(43, "a", 4.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
+		inputEvents.add(new StreamRecord<>(middleEvent3a, 6));
+		inputEvents.add(new StreamRecord<>(end1, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().allowCombinations().followedByAny("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent2, middleEvent3, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent2, middleEvent3a, end1),
+			Lists.newArrayList(startEvent, middleEvent3, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1a, end1),
+			Lists.newArrayList(startEvent, middleEvent2, end1),
+			Lists.newArrayList(startEvent, middleEvent3, end1),
+			Lists.newArrayList(startEvent, middleEvent3a, end1),
+
+			Lists.newArrayList(startEvent, end1)
+		));
+	}
+
+	@Test
+	public void testSimplePatternWSameElement() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event end1 = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(end1, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).followedBy("end1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, end1),
+			Lists.newArrayList(startEvent, middleEvent1, end1)
+		));
+	}
+
+	@Test
+	public void testIterativeConditionWSameElement() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent1a = new Event(41, "a", 2.0);
+		Event middleEvent1b = new Event(41, "a", 2.0);
+		final Event end = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
+		inputEvents.add(new StreamRecord<>(end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().allowCombinations().followedBy("end").where(new IterativeCondition<Event>() {
+
+			private static final long serialVersionUID = -5566639743229703237L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				double sum = 0.0;
+				for (Event event: ctx.getEventsForPattern("middle")) {
+					sum += event.getPrice();
+				}
+				return Double.compare(sum, 4.0) == 0;
+			}
+
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b, end)
+		));
+	}
+
+	@Test
+	public void testEndWLoopingWSameElement() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent1a = new Event(41, "a", 2.0);
+		Event middleEvent1b = new Event(41, "a", 2.0);
+		final Event end = new Event(44, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
+		inputEvents.add(new StreamRecord<>(end, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent),
+			Lists.newArrayList(startEvent, middleEvent1),
+			Lists.newArrayList(startEvent, middleEvent1a),
+			Lists.newArrayList(startEvent, middleEvent1b),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a),
+			Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b)
+		));
+	}
+
+	@Test
+	public void testRepeatingPatternWSameElement() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middle1Event1 = new Event(40, "a", 2.0);
+		Event middle1Event2 = new Event(40, "a", 3.0);
+		Event middle1Event3 = new Event(40, "a", 4.0);
+		Event middle2Event1 = new Event(40, "b", 5.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middle1Event1, 3));
+		inputEvents.add(new StreamRecord<>(middle1Event1, 3));
+		inputEvents.add(new StreamRecord<>(middle1Event2, 3));
+		inputEvents.add(new StreamRecord<>(new Event(40, "d", 6.0), 5));
+		inputEvents.add(new StreamRecord<>(middle2Event1, 6));
+		inputEvents.add(new StreamRecord<>(middle1Event3, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().followedBy("middle2").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("b");
+			}
+		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middle1Event1),
+
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1),
+			Lists.newArrayList(startEvent, middle2Event1, middle1Event3),
+
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2),
+			Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3),
+
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3),
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle2Event1, middle1Event3),
+
+			Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3)
+		));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
index bd828b6..44033c1 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/SharedBufferTest.java
@@ -18,20 +18,19 @@
 
 package org.apache.flink.cep.nfa;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.cep.Event;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
 import java.util.Collection;
 import java.util.Collections;
 
@@ -39,6 +38,9 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link SharedBuffer}.
+ */
 public class SharedBufferTest extends TestLogger {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
index 26b8ce9..cd12071 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/compiler/NFACompilerTest.java
@@ -32,6 +32,7 @@ import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -45,6 +46,9 @@ import static com.google.common.collect.Sets.newHashSet;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link NFACompiler}.
+ */
 public class NFACompilerTest extends TestLogger {
 
 	private static final SimpleCondition<Event> startFilter = new SimpleCondition<Event>() {
@@ -116,7 +120,7 @@ public class NFACompilerTest extends TestLogger {
 	}
 
 	/**
-	 * Tests that the NFACompiler generates the correct NFA from a given Pattern
+	 * Tests that the NFACompiler generates the correct NFA from a given Pattern.
 	 */
 	@Test
 	public void testNFACompilerWithSimplePattern() {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
index 0345192..f5a909b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPFrom12MigrationTest.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
@@ -33,6 +34,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OperatorSnapshotUtil;
+
 import org.junit.Ignore;
 import org.junit.Test;
 
@@ -463,7 +465,6 @@ public class CEPFrom12MigrationTest {
 		}
 	}
 
-
 	@Test
 	public void testSinglePatternAfterMigration() throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index c92f772..69ba42f 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.Test;
 
 import java.net.URL;
@@ -45,6 +46,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for migration from 1.1.x to 1.3.x.
+ */
 public class CEPMigration11to13Test {
 
 	private static String getResourceFilename(String filename) {
@@ -198,7 +202,7 @@ public class CEPMigration11to13Test {
 
 		final Event startEvent = new Event(42, "start", 1.0);
 		final SubEvent middleEvent = new SubEvent(42, "foo", 1.0, 10.0);
-		final Event endEvent=  new Event(42, "end", 1.0);
+		final Event endEvent = new Event(42, "end", 1.0);
 
 		// uncomment these lines for regenerating the snapshot on Flink 1.1
 		/*

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 38ad0f1..d83c191 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.operator;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -42,13 +41,13 @@ import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import static org.junit.Assert.*;
-
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -57,6 +56,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link KeyedCEPPatternOperator} and {@link TimeoutKeyedCEPPatternOperator}.
+ */
 public class CEPOperatorTest extends TestLogger {
 
 	@Rule
@@ -269,7 +274,7 @@ public class CEPOperatorTest extends TestLogger {
 			assertTrue(resultObject instanceof StreamRecord);
 
 			StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>> streamRecord =
-					(StreamRecord<Either<Tuple2<Map<String,List<Event>>,Long>,Map<String,List<Event>>>>) resultObject;
+					(StreamRecord<Either<Tuple2<Map<String, List<Event>>, Long>, Map<String, List<Event>>>>) resultObject;
 
 			assertTrue(streamRecord.getValue() instanceof Either.Left);
 
@@ -299,8 +304,8 @@ public class CEPOperatorTest extends TestLogger {
 		SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
 		SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
 		SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
-		Event endEvent1 =  new Event(42, "end", 1.0);
-		Event endEvent2 =  new Event(42, "end", 2.0);
+		Event endEvent1 = new Event(42, "end", 1.0);
+		Event endEvent2 = new Event(42, "end", 2.0);
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
@@ -493,8 +498,8 @@ public class CEPOperatorTest extends TestLogger {
 		SubEvent middleEvent1 = new SubEvent(42, "foo1", 1.0, 10.0);
 		SubEvent middleEvent2 = new SubEvent(42, "foo2", 1.0, 10.0);
 		SubEvent middleEvent3 = new SubEvent(42, "foo3", 1.0, 10.0);
-		Event endEvent1 =  new Event(42, "end", 1.0);
-		Event endEvent2 =  new Event(42, "end", 2.0);
+		Event endEvent1 = new Event(42, "end", 1.0);
+		Event endEvent2 = new Event(42, "end", 2.0);
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
@@ -747,7 +752,6 @@ public class CEPOperatorTest extends TestLogger {
 		Assert.assertArrayEquals(expected.toArray(), actual.toArray());
 	}
 
-
 	private class ListEventComparator implements Comparator<List<Event>> {
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 86be09c..40514df 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
 import org.junit.Test;
 
 import java.util.List;
@@ -44,6 +45,9 @@ import java.util.Queue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for rescaling of CEP operators.
+ */
 public class CEPRescalingTest {
 
 	@Test
@@ -64,7 +68,7 @@ public class CEPRescalingTest {
 
 		Event startEvent1 = new Event(7, "start", 1.0);
 		SubEvent middleEvent1 = new SubEvent(7, "foo", 1.0, 10.0);
-		Event endEvent1=  new Event(7, "end", 1.0);
+		Event endEvent1 = new Event(7, "end", 1.0);
 
 		int keygroup = KeyGroupRangeAssignment.assignToKeyGroup(keySelector.getKey(startEvent1), maxParallelism);
 		assertEquals(1, keygroup);

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index b6fb484..e00384b 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -25,15 +25,23 @@ import org.apache.flink.cep.pattern.conditions.OrCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.cep.pattern.conditions.SubtypeCondition;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for constructing {@link Pattern}.
+ */
 public class PatternTest extends TestLogger {
+
 	/**
-	 * These test simply test that the pattern construction completes without failure
+	 * These test simply test that the pattern construction completes without failure.
 	 */
-
 	@Test
 	public void testStrictContiguity() {
 		Pattern<Object, ?> pattern = Pattern.begin("start").next("next").next("end");


[3/3] flink git commit: [FLINK-6137] Activate strict checkstyle for flink-cep

Posted by ch...@apache.org.
[FLINK-6137] Activate strict checkstyle for flink-cep

This closes #3976.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c9e574bf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c9e574bf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c9e574bf

Branch: refs/heads/master
Commit: c9e574bf3206c16bd7e1be8a5672073d8846d7d7
Parents: 4f50dc4
Author: dawidwys <wy...@gmail.com>
Authored: Wed May 24 22:27:06 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Sun May 28 10:51:38 2017 +0200

----------------------------------------------------------------------
 .../flink/cep/scala/pattern/Pattern.scala       |    4 +-
 flink-libraries/flink-cep/pom.xml               |   36 +
 .../src/main/java/org/apache/flink/cep/CEP.java |    2 +-
 .../flink/cep/NonDuplicatingTypeSerializer.java |   12 +-
 .../org/apache/flink/cep/PatternStream.java     |    7 +-
 .../org/apache/flink/cep/nfa/DeweyNumber.java   |    6 +-
 .../main/java/org/apache/flink/cep/nfa/NFA.java |   52 +-
 .../org/apache/flink/cep/nfa/SharedBuffer.java  |   27 +-
 .../java/org/apache/flink/cep/nfa/State.java    |   12 +-
 .../apache/flink/cep/nfa/StateTransition.java   |    5 +
 .../flink/cep/nfa/compiler/NFACompiler.java     |   33 +-
 .../AbstractKeyedCEPPatternOperator.java        |    9 +-
 .../flink/cep/operator/CEPOperatorUtils.java    |    8 +-
 .../cep/operator/StreamRecordComparator.java    |    2 +-
 .../flink/cep/pattern/AndFilterFunction.java    |    7 +-
 .../flink/cep/pattern/OrFilterFunction.java     |    7 +-
 .../org/apache/flink/cep/pattern/Pattern.java   |   22 +-
 .../apache/flink/cep/pattern/Quantifier.java    |   22 +-
 .../cep/pattern/SubtypeFilterFunction.java      |    7 +-
 .../pattern/conditions/BooleanConditions.java   |    1 +
 .../pattern/conditions/IterativeCondition.java  |    4 +-
 .../java/org/apache/flink/cep/CEPITCase.java    |   20 +-
 .../test/java/org/apache/flink/cep/Event.java   |    3 +
 .../java/org/apache/flink/cep/SubEvent.java     |    3 +
 .../apache/flink/cep/nfa/DeweyNumberTest.java   |    4 +
 .../cep/nfa/IterativeConditionsITCase.java      |  421 ++++
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 1948 +-----------------
 .../java/org/apache/flink/cep/nfa/NFATest.java  |    8 +-
 .../apache/flink/cep/nfa/NFATestUtilities.java  |  115 ++
 .../apache/flink/cep/nfa/NotPatternITCase.java  | 1036 ++++++++++
 .../apache/flink/cep/nfa/SameElementITCase.java |  407 ++++
 .../apache/flink/cep/nfa/SharedBufferTest.java  |   10 +-
 .../flink/cep/nfa/compiler/NFACompilerTest.java |    6 +-
 .../cep/operator/CEPFrom12MigrationTest.java    |    3 +-
 .../cep/operator/CEPMigration11to13Test.java    |    6 +-
 .../flink/cep/operator/CEPOperatorTest.java     |   22 +-
 .../flink/cep/operator/CEPRescalingTest.java    |    6 +-
 .../apache/flink/cep/pattern/PatternTest.java   |   14 +-
 38 files changed, 2296 insertions(+), 2021 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index c77e70d..3a30836 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -295,7 +295,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     * {{{A1 A2 B}}} appears, this will generate patterns:
     * {{{A1 B}}} and {{{A1 A2 B}}}. See also {{{allowCombinations()}}}.
     *
-    * @return The same pattern with a [[Quantifier.ONE_OR_MORE()]] quantifier applied.
+    * @return The same pattern with a [[Quantifier.oneOrMore()]] quantifier applied.
     * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
     */
   def oneOrMore: Pattern[T, F] = {
@@ -316,7 +316,7 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
-    * Applicable only to [[Quantifier.ONE_OR_MORE()]] and [[Quantifier.TIMES()]] patterns,
+    * Applicable only to [[Quantifier.oneOrMore()]] and [[Quantifier.times()]] patterns,
     * this option allows more flexibility to the matching events.
     *
     * If {{{allowCombinations()}}} is not applied for a

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/pom.xml b/flink-libraries/flink-cep/pom.xml
index 35045c0..6622b36 100644
--- a/flink-libraries/flink-cep/pom.xml
+++ b/flink-libraries/flink-cep/pom.xml
@@ -106,6 +106,42 @@ under the License.
                     </execution>
                 </executions>
             </plugin>
+
+			<!-- We have more strict checkstyle rules than the rest of the project -->
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<dependencies>
+					<dependency>
+						<groupId>com.puppycrawl.tools</groupId>
+						<artifactId>checkstyle</artifactId>
+						<version>6.19</version>
+					</dependency>
+				</dependencies>
+				<configuration>
+					<configLocation>/tools/maven/strict-checkstyle.xml</configLocation>
+					<suppressionsLocation>/tools/maven/suppressions.xml</suppressionsLocation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+					<logViolationsToConsole>true</logViolationsToConsole>
+					<failOnViolation>true</failOnViolation>
+				</configuration>
+				<executions>
+					<!--
+					Execute checkstyle after compilation but before tests.
+
+					This ensures that any parsing or type checking errors are from
+					javac, so they look as expected. Beyond that, we want to
+					fail as early as possible.
+					-->
+					<execution>
+						<phase>test-compile</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+			</plugin>
         </plugins>
     </build>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index 9ce9f77..0ef9c21 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -24,7 +24,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 /**
  * Utility class for complex event processing.
  *
- * Methods which transform a {@link DataStream} into a {@link PatternStream} to do CEP.
+ * <p>Methods which transform a {@link DataStream} into a {@link PatternStream} to do CEP.
  */
 public class CEP {
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
index 9e22fc2..f9e13fe 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/NonDuplicatingTypeSerializer.java
@@ -35,7 +35,7 @@ import java.util.IdentityHashMap;
  * serialized once. If the same object shall be serialized again, then a reference handle is
  * written instead.
  *
- * Avoiding duplication is achieved by keeping an internal identity hash map. This map contains
+ * <p>Avoiding duplication is achieved by keeping an internal identity hash map. This map contains
  * all serialized objects. To make the serializer work it is important that the same serializer
  * is used for a coherent serialization run. After the serialization has stopped, the identity
  * hash map should be cleared.
@@ -107,8 +107,8 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
 
 	/**
 	 * Serializes the given record.
-	 * <p>
-	 * First a boolean indicating whether a reference handle (true) or the object (false) is
+	 *
+	 * <p>First a boolean indicating whether a reference handle (true) or the object (false) is
 	 * written. Then, either the reference handle or the object is written.
 	 *
 	 * @param record The record to serialize.
@@ -128,8 +128,8 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
 
 	/**
 	 * Deserializes an object from the input view.
-	 * <p>
-	 * First it reads a boolean indicating whether a reference handle or a serialized object
+	 *
+	 * <p>First it reads a boolean indicating whether a reference handle or a serialized object
 	 * follows.
 	 *
 	 * @param source The input view from which to read the data.
@@ -172,7 +172,7 @@ public final class NonDuplicatingTypeSerializer<T> extends TypeSerializer<T> {
 	public boolean equals(Object obj) {
 		if (obj instanceof NonDuplicatingTypeSerializer) {
 			@SuppressWarnings("unchecked")
-			NonDuplicatingTypeSerializer<T> other = (NonDuplicatingTypeSerializer<T>)obj;
+			NonDuplicatingTypeSerializer<T> other = (NonDuplicatingTypeSerializer<T>) obj;
 
 			return (other.canEqual(this) && typeSerializer.equals(other.typeSerializer));
 		} else {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 5544689..71614cf 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -40,7 +40,7 @@ import java.util.Map;
  * {@link org.apache.flink.cep.nfa.NFA}. In order to process the detected sequences, the user
  * has to specify a {@link PatternSelectFunction} or a {@link PatternFlatSelectFunction}.
  *
- * Additionally it allows to handle partially matched event patterns which have timed out. For this
+ * <p>Additionally it allows to handle partially matched event patterns which have timed out. For this
  * the user has to specify a {@link PatternTimeoutFunction} or a {@link PatternFlatTimeoutFunction}.
  *
  * @param <T> Type of the events
@@ -119,7 +119,7 @@ public class PatternStream<T> {
 	 * provided {@link PatternSelectFunction} is called. The pattern select function can produce
 	 * exactly one resulting element.
 	 *
-	 * Applies a timeout function to a partial pattern sequence which has timed out. For each
+	 * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each
 	 * partial pattern sequence the provided {@link PatternTimeoutFunction} is called. The pattern
 	 * timeout function can produce exactly one resulting element.
 	 *
@@ -220,7 +220,7 @@ public class PatternStream<T> {
 	 * the provided {@link PatternFlatSelectFunction} is called. The pattern flat select function
 	 * can produce an arbitrary number of resulting elements.
 	 *
-	 * Applies a timeout function to a partial pattern sequence which has timed out. For each
+	 * <p>Applies a timeout function to a partial pattern sequence which has timed out. For each
 	 * partial pattern sequence the provided {@link PatternFlatTimeoutFunction} is called. The
 	 * pattern timeout function can produce an arbitrary number of resulting elements.
 	 *
@@ -397,7 +397,6 @@ public class PatternStream<T> {
 			this.patternFlatSelectFunction = patternFlatSelectFunction;
 		}
 
-
 		@Override
 		public void flatMap(Map<String, List<T>> value, Collector<R> out) throws Exception {
 			patternFlatSelectFunction.flatSelect(value, out);

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
index 3827956..f066141 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/DeweyNumber.java
@@ -31,7 +31,7 @@ import java.util.Arrays;
 /**
  * Versioning scheme which allows to retrieve dependencies between different versions.
  *
- * A dewey number consists of a sequence of digits d1.d2.d3. ... .dn. A dewey number v is compatible
+ * <p>A dewey number consists of a sequence of digits d1.d2.d3. ... .dn. A dewey number v is compatible
  * to v' iff v contains v' as a prefix or if both dewey number differ only in the last digit and
  * the last digit of v is greater than v'.
  *
@@ -58,7 +58,7 @@ public class DeweyNumber implements Serializable {
 	/**
 	 * Checks whether this dewey number is compatible to the other dewey number.
 	 *
-	 * True iff this contains other as a prefix or iff they differ only in the last digit whereas
+	 * <p>True iff this contains other as a prefix or iff they differ only in the last digit whereas
 	 * the last digit of this is greater than the last digit of other.
 	 *
 	 * @param other The other dewey number to check compatibility against
@@ -106,7 +106,7 @@ public class DeweyNumber implements Serializable {
 
 	/**
 	 * Creates a new dewey number from this such that its last digit is increased by the supplied
-	 * number
+	 * number.
 	 *
 	 * @param times how many times to increase the Dewey number
 	 * @return A new dewey number derived from this whose last digit is increased by given number

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 2be09ad..f438915 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.cep.nfa;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.ListMultimap;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
@@ -29,9 +26,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.EnumSerializer;
-import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
 import org.apache.flink.cep.NonDuplicatingTypeSerializer;
@@ -44,7 +41,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.ListMultimap;
+
 import javax.annotation.Nullable;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -68,27 +70,26 @@ import java.util.Stack;
 
 /**
  * Non-deterministic finite automaton implementation.
- * <p>
- * The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator}
+ *
+ * <p>The {@link org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator CEP operator}
  * keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones.
  * When an event gets processed, it updates the NFA's internal state machine.
- * <p>
- * An event that belongs to a partially matched sequence is kept in an internal
+ *
+ * <p>An event that belongs to a partially matched sequence is kept in an internal
  * {@link SharedBuffer buffer}, which is a memory-optimized data-structure exactly for
  * this purpose. Events in the buffer are removed when all the matched sequences that
  * contain them are:
  * <ol>
- *     <li>emitted (success)</li>
- *     <li>discarded (patterns containing NOT)</li>
- *     <li>timed-out (windowed patterns)</li>
+ *  <li>emitted (success)</li>
+ *  <li>discarded (patterns containing NOT)</li>
+ *  <li>timed-out (windowed patterns)</li>
  * </ol>
  *
- * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
- *
- * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
- *     https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
+ * <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
  *
  * @param <T> Type of the processed events
+ * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
+ * https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
  */
 public class NFA<T> implements Serializable {
 
@@ -289,7 +290,7 @@ public class NFA<T> implements Serializable {
 		}
 
 		// prune shared buffer based on window length
-		if(windowTime > 0L) {
+		if (windowTime > 0L) {
 			long pruningTimestamp = timestamp - windowTime;
 
 			if (pruningTimestamp < timestamp) {
@@ -360,6 +361,7 @@ public class NFA<T> implements Serializable {
 		int getTotalIgnoreBranches() {
 			return totalIgnoreBranches;
 		}
+
 		int getTotalTakeBranches() {
 			return totalTakeBranches;
 		}
@@ -672,7 +674,7 @@ public class NFA<T> implements Serializable {
 
 	//////////////////////			Fault-Tolerance / Migration			//////////////////////
 
-	private final static String BEGINNING_STATE_NAME = "$beginningState$";
+	private static final String BEGINNING_STATE_NAME = "$beginningState$";
 
 	private void readObject(ObjectInputStream ois) throws IOException, ClassNotFoundException {
 		ois.defaultReadObject();
@@ -747,7 +749,6 @@ public class NFA<T> implements Serializable {
 						}
 					}).getTargetState().getName();
 
-
 				final State<T> previousState = convertedStates.get(previousName);
 
 				computationStates.add(ComputationState.createState(
@@ -783,16 +784,16 @@ public class NFA<T> implements Serializable {
 
 	@SuppressWarnings("unchecked")
 	private ComputationState<T> readComputationState(ObjectInputStream ois) throws IOException, ClassNotFoundException {
-		final State<T> state = (State<T>)ois.readObject();
+		final State<T> state = (State<T>) ois.readObject();
 		State<T> previousState;
 		try {
-			previousState = (State<T>)ois.readObject();
+			previousState = (State<T>) ois.readObject();
 		} catch (OptionalDataException e) {
 			previousState = null;
 		}
 
 		final long timestamp = ois.readLong();
-		final DeweyNumber version = (DeweyNumber)ois.readObject();
+		final DeweyNumber version = (DeweyNumber) ois.readObject();
 		final long startTimestamp = ois.readLong();
 
 		final boolean hasEvent = ois.readBoolean();
@@ -915,7 +916,7 @@ public class NFA<T> implements Serializable {
 			serializeStates(record.states, target);
 			target.writeLong(record.windowTime);
 			target.writeBoolean(record.handleTimeout);
-			
+
 			sharedBufferSerializer.serialize(record.eventSharedBuffer, target);
 
 			target.writeInt(record.computationStates.size());
@@ -948,10 +949,10 @@ public class NFA<T> implements Serializable {
 			Set<State<T>> states = deserializeStates(source);
 			long windowTime = source.readLong();
 			boolean handleTimeout = source.readBoolean();
-			
+
 			NFA<T> nfa = new NFA<>(eventSerializer, windowTime, handleTimeout);
 			nfa.states = states;
-			
+
 			nfa.eventSharedBuffer = sharedBufferSerializer.deserialize(source);
 
 			Queue<ComputationState<T>> computationStates = new LinkedList<>();
@@ -1132,7 +1133,6 @@ public class NFA<T> implements Serializable {
 			TypeSerializer<State.StateType> stateTypeSerializer = new EnumSerializer<>(State.StateType.class);
 			TypeSerializer<StateTransitionAction> actionSerializer = new EnumSerializer<>(StateTransitionAction.class);
 
-
 			final int noOfStates = in.readInt();
 			Map<String, State<T>> states = new HashMap<>(noOfStates);
 
@@ -1250,7 +1250,7 @@ public class NFA<T> implements Serializable {
 				ois.close();
 				bais.close();
 				return copy;
-			} catch (IOException|ClassNotFoundException e) {
+			} catch (IOException | ClassNotFoundException e) {
 				throw new RuntimeException("Could not copy NFA.", e);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
index 91fce1f..a44b333 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/SharedBuffer.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.cep.nfa;
 
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
@@ -37,6 +34,10 @@ import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.util.Preconditions;
 
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import org.apache.commons.lang3.StringUtils;
+
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
@@ -57,15 +58,15 @@ import java.util.Stack;
 /**
  * A shared buffer implementation which stores values under a key. Additionally, the values can be
  * versioned such that it is possible to retrieve their predecessor element in the buffer.
- * <p>
- * The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each
+ *
+ * <p>The idea of the implementation is to have for each key a dedicated {@link SharedBufferPage}. Each
  * buffer page maintains a collection of the inserted values.
  *
- * The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store
+ * <p>The values are wrapped in a {@link SharedBufferEntry}. The shared buffer entry allows to store
  * relations between different entries. A dewey versioning scheme allows to discriminate between
  * different relations (e.g. preceding element).
  *
- * The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
+ * <p>The implementation is strongly based on the paper "Efficient Pattern Matching over Event Streams".
  *
  * @see <a href="https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf">
  *     https://people.cs.umass.edu/~yanlei/publications/sase-sigmod08.pdf</a>
@@ -245,7 +246,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 				if (currentEntry == null) {
 					final ListMultimap<K, V> completePath = ArrayListMultimap.create();
 
-					while(!currentPath.isEmpty()) {
+					while (!currentPath.isEmpty()) {
 						final SharedBufferEntry<K, V> currentPathEntry = currentPath.pop();
 
 						completePath.put(currentPathEntry.getKey(), currentPathEntry.getValueTime().getValue());
@@ -398,7 +399,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 	public String toString() {
 		StringBuilder builder = new StringBuilder();
 
-		for(Map.Entry<K, SharedBufferPage<K, V>> entry: pages.entrySet()){
+		for (Map.Entry<K, SharedBufferPage<K, V>> entry : pages.entrySet()) {
 			builder.append("Key: ").append(entry.getKey()).append("\n");
 			builder.append("Value: ").append(entry.getValue()).append("\n");
 		}
@@ -644,7 +645,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 	}
 
 	/**
-	 * Versioned edge between two shared buffer entries
+	 * Versioned edge between two shared buffer entries.
 	 *
 	 * @param <K> Type of the key
 	 * @param <V> Type of the value
@@ -747,7 +748,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		public boolean equals(Object obj) {
 			if (obj instanceof ValueTimeWrapper) {
 				@SuppressWarnings("unchecked")
-				ValueTimeWrapper<V> other = (ValueTimeWrapper<V>)obj;
+				ValueTimeWrapper<V> other = (ValueTimeWrapper<V>) obj;
 
 				return timestamp == other.getTimestamp() && value.equals(other.getValue()) && counter == other.getCounter();
 			} else {
@@ -928,7 +929,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 
 				// key for the current page
 				keySerializer.serialize(page.getKey(), target);
-				
+
 				// number of page entries
 				target.writeInt(page.entries.size());
 
@@ -1182,7 +1183,7 @@ public class SharedBuffer<K extends Serializable, V> implements Serializable {
 		for (int i = 0; i < numberPages; i++) {
 			// key of the page
 			@SuppressWarnings("unchecked")
-			K key = (K)ois.readObject();
+			K key = (K) ois.readObject();
 
 			SharedBufferPage<K, V> page = new SharedBufferPage<>(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
index 14395b1..035674c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/State.java
@@ -30,8 +30,8 @@ import java.util.Objects;
 
 /**
  * Represents a state of the {@link NFA}.
- * <p>
- * Each state is identified by a name and a state type. Furthermore, it contains a collection of
+ *
+ * <p>Each state is identified by a name and a state type. Furthermore, it contains a collection of
  * state transitions. The state transitions describe under which conditions it is possible to enter
  * a new state.
  *
@@ -59,7 +59,9 @@ public class State<T> implements Serializable {
 		return stateType == StateType.Final;
 	}
 
-	public boolean isStart() { return stateType == StateType.Start; }
+	public boolean isStart() {
+		return stateType == StateType.Start;
+	}
 
 	public String getName() {
 		return name;
@@ -84,7 +86,7 @@ public class State<T> implements Serializable {
 		addStateTransition(StateTransitionAction.IGNORE, this, condition);
 	}
 
-	public void addIgnore(final State<T> targetState,final IterativeCondition<T> condition) {
+	public void addIgnore(final State<T> targetState, final IterativeCondition<T> condition) {
 		addStateTransition(StateTransitionAction.IGNORE, targetState, condition);
 	}
 
@@ -104,7 +106,7 @@ public class State<T> implements Serializable {
 	public boolean equals(Object obj) {
 		if (obj instanceof State) {
 			@SuppressWarnings("unchecked")
-			State<T> other = (State<T>)obj;
+			State<T> other = (State<T>) obj;
 
 			return name.equals(other.name) &&
 				stateType == other.stateType &&

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
index c6850cc..bb61e09 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/StateTransition.java
@@ -25,6 +25,11 @@ import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import java.io.Serializable;
 import java.util.Objects;
 
+/**
+ * Represents a transition from one {@link State} to another.
+ *
+ * @param <T> type of events that are handled by the {@link IterativeCondition}
+ */
 public class StateTransition<T> implements Serializable {
 	private static final long serialVersionUID = -4825345749997891838L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index 1b31485..8d1d366 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -18,18 +18,6 @@
 
 package org.apache.flink.cep.nfa.compiler;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterators;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -46,6 +34,21 @@ import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.util.Preconditions;
 
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterators;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
 /**
  * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
  * {@link NFAFactory}.
@@ -396,7 +399,7 @@ public class NFACompiler {
 
 		/**
 		 * Creates a "complex" state consisting of given number of states with
-		 * same {@link IterativeCondition}
+		 * same {@link IterativeCondition}.
 		 *
 		 * @param sinkState the state that the created state should point to
 		 * @param times     number of times the state should be copied
@@ -720,8 +723,8 @@ public class NFACompiler {
 
 	/**
 	 * Implementation of the {@link NFAFactory} interface.
-	 * <p>
-	 * The implementation takes the input type serializer, the window time and the set of
+	 *
+	 * <p>The implementation takes the input type serializer, the window time and the set of
 	 * states and their transitions to be able to create an NFA from them.
 	 *
 	 * @param <T> Type of the input events which are processed by the NFA

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index af4b53e..2e3aefd 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -30,9 +30,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
 import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
 import org.apache.flink.api.common.typeutils.base.CollectionSerializerConfigSnapshot;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -40,13 +40,13 @@ import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.migration.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.runtime.state.StateInitializationContext;
-import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -337,7 +337,6 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT>
 					)
 			);
 
-
 		if (migratingFromOldKeyedOperator) {
 			int numberEntries = inputView.readInt();
 			for (int i = 0; i < numberEntries; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index e7b7e65..d00e5e9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.cep.PatternStream;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -39,6 +40,9 @@ import org.apache.flink.types.Either;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Utility methods for creating {@link PatternStream}.
+ */
 public class CEPOperatorUtils {
 
 	/**
@@ -61,7 +65,7 @@ public class CEPOperatorUtils {
 
 		if (inputStream instanceof KeyedStream) {
 			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
-			KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
+			KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream;
 
 			TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 
@@ -122,7 +126,7 @@ public class CEPOperatorUtils {
 
 		if (inputStream instanceof KeyedStream) {
 			// We have to use the KeyedCEPPatternOperator which can deal with keyed input streams
-			KeyedStream<T, K> keyedStream= (KeyedStream<T, K>) inputStream;
+			KeyedStream<T, K> keyedStream = (KeyedStream<T, K>) inputStream;
 
 			TypeSerializer<K> keySerializer = keyedStream.getKeyType().createSerializer(keyedStream.getExecutionConfig());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
index b290e7b..30fbc26 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/StreamRecordComparator.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
 import java.util.Comparator;
 
 /**
- * Compares two {@link StreamRecord}s based on their timestamp
+ * Compares two {@link StreamRecord}s based on their timestamp.
  *
  * @param <IN> Type of the value field of the StreamRecord
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
index a7391d5..ef3071f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/AndFilterFunction.java
@@ -21,13 +21,12 @@ package org.apache.flink.cep.pattern;
 import org.apache.flink.api.common.functions.FilterFunction;
 
 /**
- * @deprecated This is only used when migrating from an older Flink version.
- * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} instead.
- *
- * <p>A filter function which combines two filter functions with a logical and. Thus, the filter
+ * A filter function which combines two filter functions with a logical and. Thus, the filter
  * function only returns true, iff both filters return true.
  *
  * @param <T> Type of the element to filter
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.AndCondition} instead.
  */
 @Deprecated
 public class AndFilterFunction<T> implements FilterFunction<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
index 3620cae..d1c406a 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
@@ -21,13 +21,12 @@ package org.apache.flink.cep.pattern;
 import org.apache.flink.api.common.functions.FilterFunction;
 
 /**
- * @deprecated This is only used when migrating from an older Flink version.
- * Use the {@link org.apache.flink.cep.pattern.conditions.OrCondition} instead.
- *
- * <p>A filter function which combines two filter functions with a logical or. Thus, the filter
+ * A filter function which combines two filter functions with a logical or. Thus, the filter
  * function only returns true, iff at least one of the filter functions holds true.
  *
  * @param <T> Type of the element to filter
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.OrCondition} instead.
  */
 @Deprecated
 public class OrFilterFunction<T> implements FilterFunction<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 2d10b41..2676994 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -30,10 +30,10 @@ import org.apache.flink.util.Preconditions;
 
 /**
  * Base class for a pattern definition.
- * <p>
- * A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create a {@link NFA}.
  *
- * <pre>{@code
+ * <p>A pattern definition is used by {@link org.apache.flink.cep.nfa.compiler.NFACompiler} to create a {@link NFA}.
+ *
+ * <p><pre>{@code
  * Pattern<T, F> pattern = Pattern.<T>begin("start")
  *   .next("middle").subtype(F.class)
  *   .followedBy("end").where(new MyCondition());
@@ -57,8 +57,8 @@ public class Pattern<T, F extends T> {
 	/** Window length in which the pattern match has to occur. */
 	private Time windowTime;
 
-	/** A quantifier for the pattern. By default set to {@link Quantifier#ONE(ConsumingStrategy)}. */
-	private Quantifier quantifier = Quantifier.ONE(ConsumingStrategy.STRICT);
+	/** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}. */
+	private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);
 
 	/**
 	 * Applicable to a {@code times} pattern, and holds
@@ -77,7 +77,7 @@ public class Pattern<T, F extends T> {
 			final ConsumingStrategy consumingStrategy) {
 		this.name = name;
 		this.previous = previous;
-		this.quantifier = Quantifier.ONE(consumingStrategy);
+		this.quantifier = Quantifier.one(consumingStrategy);
 	}
 
 	public Pattern<T, ? extends T> getPrevious() {
@@ -295,13 +295,13 @@ public class Pattern<T, F extends T> {
 	 * {@code A1 A2 B} appears, this will generate patterns:
 	 * {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}.
 	 *
-	 * @return The same pattern with a {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} quantifier applied.
+	 * @return The same pattern with a {@link Quantifier#oneOrMore(ConsumingStrategy)} quantifier applied.
 	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
 	 */
 	public Pattern<T, F> oneOrMore() {
 		checkIfNoNotPattern();
 		checkIfQuantifierApplied();
-		this.quantifier = Quantifier.ONE_OR_MORE(quantifier.getConsumingStrategy());
+		this.quantifier = Quantifier.oneOrMore(quantifier.getConsumingStrategy());
 		return this;
 	}
 
@@ -317,14 +317,14 @@ public class Pattern<T, F extends T> {
 		checkIfNoNotPattern();
 		checkIfQuantifierApplied();
 		Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
-		this.quantifier = Quantifier.TIMES(quantifier.getConsumingStrategy());
+		this.quantifier = Quantifier.times(quantifier.getConsumingStrategy());
 		this.times = times;
 		return this;
 	}
 
 	/**
-	 * Applicable only to {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} and
-	 * {@link Quantifier#TIMES(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events.
+	 * Applicable only to {@link Quantifier#oneOrMore(ConsumingStrategy)} and
+	 * {@link Quantifier#times(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events.
 	 *
 	 * <p>If {@code allowCombinations()} is not applied for a
 	 * pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 382c3ba..efc7cf4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -15,11 +15,24 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.cep.pattern;
 
 import java.util.EnumSet;
 import java.util.Objects;
 
+/**
+ * A quantifier describing the Pattern. There are three main groups of {@link Quantifier}.
+ *
+ * <p><ol>
+ *     <li>Single</li>
+ *     <li>Looping</li>
+ *     <li>Times</li>
+ * </ol>
+ *
+ * <p>Each {@link Pattern} can be optional and have a {@link ConsumingStrategy}. Looping and Times also hava an
+ * additional inner consuming strategy that is applied between accepted events in the pattern.
+ */
 public class Quantifier {
 
 	private final EnumSet<QuantifierProperty> properties;
@@ -36,15 +49,15 @@ public class Quantifier {
 		this.consumingStrategy = consumingStrategy;
 	}
 
-	public static Quantifier ONE(final ConsumingStrategy consumingStrategy) {
+	public static Quantifier one(final ConsumingStrategy consumingStrategy) {
 		return new Quantifier(consumingStrategy, QuantifierProperty.SINGLE);
 	}
 
-	public static Quantifier ONE_OR_MORE(final ConsumingStrategy consumingStrategy) {
+	public static Quantifier oneOrMore(final ConsumingStrategy consumingStrategy) {
 		return new Quantifier(consumingStrategy, QuantifierProperty.LOOPING);
 	}
 
-	public static Quantifier TIMES(final ConsumingStrategy consumingStrategy) {
+	public static Quantifier times(final ConsumingStrategy consumingStrategy) {
 		return new Quantifier(consumingStrategy, QuantifierProperty.TIMES);
 	}
 
@@ -118,6 +131,9 @@ public class Quantifier {
 		OPTIONAL
 	}
 
+	/**
+	 * Describes strategy for which events are matched in this {@link Pattern}. See docs for more info.
+	 */
 	public enum ConsumingStrategy {
 		STRICT,
 		SKIP_TILL_NEXT,

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
index ae48df3..f5008c1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/SubtypeFilterFunction.java
@@ -21,13 +21,12 @@ package org.apache.flink.cep.pattern;
 import org.apache.flink.api.common.functions.FilterFunction;
 
 /**
- * @deprecated This is only used when migrating from an older Flink version.
- * Use the {@link org.apache.flink.cep.pattern.conditions.SubtypeCondition} instead.
- *
- * <p>A filter function which filters elements of the given type. A element if filtered out iff it
+ * A filter function which filters elements of the given type. A element if filtered out iff it
  * is not assignable to the given subtype of T.
  *
  * @param <T> Type of the elements to be filtered
+ * @deprecated This is only used when migrating from an older Flink version.
+ * Use the {@link org.apache.flink.cep.pattern.conditions.SubtypeCondition} instead.
  */
 @Deprecated
 public class SubtypeFilterFunction<T> implements FilterFunction<T> {

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
index d67b407..aea5a3b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/BooleanConditions.java
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.cep.pattern.conditions;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
index 016cdef..e7c814f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -65,8 +65,8 @@ public abstract class IterativeCondition<T> implements Function, Serializable {
 
 	/**
 	 * The filter function that evaluates the predicate.
-	 * <p>
-	 * <strong>IMPORTANT:</strong> The system assumes that the function does not
+	 *
+	 * <p><strong>IMPORTANT:</strong> The system assumes that the function does not
 	 * modify the elements on which the predicate is applied. Violating this assumption
 	 * can lead to incorrect results.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 9a08659..66eeca8 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -21,6 +21,7 @@ package org.apache.flink.cep;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.core.fs.FileSystem;
@@ -31,8 +32,8 @@ import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-
 import org.apache.flink.types.Either;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -42,6 +43,9 @@ import org.junit.rules.TemporaryFolder;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * End to end tests of both CEP operators and {@link NFA}.
+ */
 @SuppressWarnings("serial")
 public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
@@ -70,7 +74,8 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	/**
-	 * Checks that a certain event sequence is recognized
+	 * Checks that a certain event sequence is recognized.
+	 *
 	 * @throws Exception
 	 */
 	@Test
@@ -224,7 +229,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			Tuple2.of(new Event(5, "middle", 5.0), 7L),
 			// last element for high final watermark
 			Tuple2.of(new Event(5, "middle", 5.0), 100L)
-		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
 
 			@Override
 			public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) {
@@ -307,7 +312,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			Tuple2.of(new Event(1, "middle", 5.0), 7L),
 			Tuple2.of(new Event(3, "middle", 6.0), 9L),
 			Tuple2.of(new Event(3, "end", 7.0), 7L)
-		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
 
 			@Override
 			public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
@@ -444,7 +449,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			Tuple2.of(new Event(1, "middle", 2.0), 5L),
 			Tuple2.of(new Event(1, "start", 2.0), 4L),
 			Tuple2.of(new Event(1, "end", 2.0), 6L)
-		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+		).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
 
 			@Override
 			public long extractTimestamp(Tuple2<Event, Long> element, long currentTimestamp) {
@@ -515,7 +520,8 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	/**
-	 * Checks that a certain event sequence is recognized with an OR filter
+	 * Checks that a certain event sequence is recognized with an OR filter.
+	 *
 	 * @throws Exception
 	 */
 	@Test
@@ -580,4 +586,4 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
 		env.execute();
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
index efe56b7..ef072ce 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/Event.java
@@ -25,6 +25,9 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 
 import java.util.Objects;
 
+/**
+ * Exemplary event for usage in tests of CEP. See also {@link SubEvent}
+ */
 public class Event {
 	private String name;
 	private double price;

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
index effb382..cf5dc9d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/SubEvent.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.cep;
 
+/**
+ * A subclass of {@link Event} for usage in tests.
+ */
 public class SubEvent extends Event {
 	private final double volume;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
index 8bc010a..e28e77d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/DeweyNumberTest.java
@@ -19,12 +19,16 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+/**
+ * Tests for {@link DeweyNumber}.
+ */
 public class DeweyNumberTest extends TestLogger {
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
new file mode 100644
index 0000000..910907f
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/IterativeConditionsITCase.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+
+/**
+ * IT tests covering {@link IterativeCondition} usage.
+ */
+@SuppressWarnings("unchecked")
+public class IterativeConditionsITCase extends TestLogger {
+
+	//////////////////////			Iterative BooleanConditions			/////////////////////////
+
+	private final Event startEvent1 = new Event(40, "start", 1.0);
+	private final Event startEvent2 = new Event(40, "start", 2.0);
+	private final Event startEvent3 = new Event(40, "start", 3.0);
+	private final Event startEvent4 = new Event(40, "start", 4.0);
+	private final SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10);
+	private final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10);
+	private final SubEvent middleEvent3 = new SubEvent(43, "foo3", 3.0, 10);
+	private final SubEvent middleEvent4 = new SubEvent(43, "foo4", 1.0, 10);
+	private final Event nextOne = new Event(44, "next-one", 1.0);
+	private final Event endEvent = new Event(46, "end", 1.0);
+
+	@Test
+	public void testIterativeWithBranchingPatternEager() {
+		List<List<Event>> actual = testIterativeWithBranchingPattern(true);
+
+		compareMaps(actual,
+			Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
+				Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+				Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+				Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+				Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+			)
+		);
+	}
+
+	@Test
+	public void testIterativeWithBranchingPatternCombinations() {
+		List<List<Event>> actual = testIterativeWithBranchingPattern(false);
+
+		compareMaps(actual,
+			Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
+				Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
+				Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
+				Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
+				Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
+				Lists.newArrayList(startEvent1, endEvent, middleEvent1),
+				Lists.newArrayList(startEvent2, endEvent, middleEvent3)
+			)
+		);
+	}
+
+	private List<List<Event>> testIterativeWithBranchingPattern(boolean eager) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1));
+		inputEvents.add(new StreamRecord<Event>(middleEvent1, 2));
+		inputEvents.add(new StreamRecord<Event>(middleEvent2, 3));
+		inputEvents.add(new StreamRecord<>(startEvent2, 4));
+		inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<Event>(middleEvent4, 5));
+		inputEvents.add(new StreamRecord<>(nextOne, 6));
+		inputEvents.add(new StreamRecord<>(endEvent, 8));
+
+		Pattern<Event, ?> pattern = eager
+			? Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		})
+			.followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore()
+			.followedBy("end").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 7056763917392056548L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+			})
+			: Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		})
+			.followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore().allowCombinations()
+			.followedBy("end").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 7056763917392056548L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	private static class MySubeventIterCondition extends IterativeCondition<SubEvent> {
+
+		private static final long serialVersionUID = 6215754202506583964L;
+
+		@Override
+		public boolean filter (SubEvent value, Context < SubEvent > ctx) throws Exception {
+			if (!value.getName().startsWith("foo")) {
+				return false;
+			}
+
+			double sum = 0.0;
+			for (Event event : ctx.getEventsForPattern("middle")) {
+				sum += event.getPrice();
+			}
+			sum += value.getPrice();
+			return Double.compare(sum, 5.0) < 0;
+		}
+	}
+
+	@Test
+	public void testIterativeWithLoopingStartingEager() {
+		List<List<Event>> actual = testIterativeWithLoopingStarting(true);
+
+		compareMaps(actual,
+			Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent1, startEvent2, endEvent),
+				Lists.newArrayList(startEvent1, endEvent),
+				Lists.newArrayList(startEvent2, endEvent),
+				Lists.newArrayList(startEvent3, endEvent),
+				Lists.newArrayList(endEvent)
+			)
+		);
+	}
+
+	@Test
+	public void testIterativeWithLoopingStartingCombination() {
+		List<List<Event>> actual = testIterativeWithLoopingStarting(false);
+
+		compareMaps(actual,
+			Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent1, startEvent2, endEvent),
+				Lists.newArrayList(startEvent1, startEvent3, endEvent),
+				Lists.newArrayList(startEvent1, endEvent),
+				Lists.newArrayList(startEvent2, endEvent),
+				Lists.newArrayList(startEvent3, endEvent),
+				Lists.newArrayList(endEvent)
+			)
+		);
+	}
+
+	private List<List<Event>> testIterativeWithLoopingStarting(boolean eager) {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+		inputEvents.add(new StreamRecord<>(startEvent3, 3L));
+		inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+		// for now, a pattern inherits its continuity property from the followedBy() or next(), and the default
+		// behavior (which is the one applied in the case that the pattern graph starts with such a pattern)
+		// of a looping pattern is with relaxed continuity (as in followedBy).
+
+		Pattern<Event, ?> pattern = eager
+			? Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().optional()
+			.followedBy("end").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 7056763917392056548L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+			})
+			: Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().allowCombinations().optional()
+			.followedBy("end").where(new SimpleCondition<Event>() {
+				private static final long serialVersionUID = 7056763917392056548L;
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+			});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		return feedNFA(inputEvents, nfa);
+	}
+
+	private static class MyEventIterCondition extends IterativeCondition<Event> {
+
+		private static final long serialVersionUID = 6215754202506583964L;
+
+		@Override
+		public boolean filter(Event value, Context<Event> ctx) throws Exception {
+			if (!value.getName().equals("start")) {
+				return false;
+			}
+
+			double sum = 0.0;
+			for (Event event : ctx.getEventsForPattern("start")) {
+				sum += event.getPrice();
+			}
+			sum += value.getPrice();
+			return Double.compare(sum, 5.0) < 0;
+		}
+	}
+
+	@Test
+	public void testIterativeWithPrevPatternDependency() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+		inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).oneOrMore().followedBy("end").where(new IterativeCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				if (!value.getName().equals("end")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event event : ctx.getEventsForPattern("start")) {
+					sum += event.getPrice();
+				}
+				return Double.compare(sum, 2.0) >= 0;
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns,
+			Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent1, startEvent2, endEvent),
+				Lists.newArrayList(startEvent2, endEvent)
+			)
+		);
+	}
+
+	@Test
+	public void testIterativeWithABACPattern() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1L)); //1
+		inputEvents.add(new StreamRecord<Event>(middleEvent1, 2L)); //1
+
+		inputEvents.add(new StreamRecord<>(startEvent2, 2L)); //2
+		inputEvents.add(new StreamRecord<>(startEvent3, 2L)); //3
+		inputEvents.add(new StreamRecord<Event>(middleEvent2, 2L)); //2
+
+		inputEvents.add(new StreamRecord<>(startEvent4, 2L)); //4
+		inputEvents.add(new StreamRecord<Event>(middleEvent3, 2L)); //3
+		inputEvents.add(new StreamRecord<Event>(middleEvent4, 2L)); //1
+		inputEvents.add(new StreamRecord<>(endEvent, 4L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 2178338526904474690L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getName().startsWith("foo");
+			}
+		}).followedBy("middle2").where(new IterativeCondition<Event>() {
+			private static final long serialVersionUID = -1223388426808292695L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				if (!value.getName().equals("start")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event e: ctx.getEventsForPattern("middle2")) {
+					sum += e.getPrice();
+				}
+				sum += value.getPrice();
+				return Double.compare(sum, 5.0) <= 0;
+			}
+		}).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 562590474115118323L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("end");
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns,
+			Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent1, endEvent),
+				Lists.newArrayList(startEvent1, middleEvent1, startEvent2, endEvent),
+				Lists.newArrayList(startEvent1, middleEvent2, startEvent4, endEvent),
+				Lists.newArrayList(startEvent2, middleEvent2, startEvent4, endEvent),
+				Lists.newArrayList(startEvent3, middleEvent2, startEvent4, endEvent)
+			)
+		);
+	}
+
+	@Test
+	public void testIterativeWithPrevPatternDependencyAfterBranching() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
+		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
+		inputEvents.add(new StreamRecord<Event>(middleEvent1, 4L));
+		inputEvents.add(new StreamRecord<>(startEvent3, 5L));
+		inputEvents.add(new StreamRecord<Event>(middleEvent2, 6L));
+		inputEvents.add(new StreamRecord<>(endEvent, 7L));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 6215754202506583964L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("start");
+			}
+		}).oneOrMore().followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
+			private static final long serialVersionUID = 2178338526904474690L;
+
+			@Override
+			public boolean filter(SubEvent value) throws Exception {
+				return value.getName().startsWith("foo");
+			}
+		}).followedByAny("end").where(new IterativeCondition<Event>() {
+			private static final long serialVersionUID = 7056763917392056548L;
+
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+				if (!value.getName().equals("end")) {
+					return false;
+				}
+
+				double sum = 0.0;
+				for (Event event : ctx.getEventsForPattern("start")) {
+					sum += event.getPrice();
+				}
+				return Double.compare(sum, 2.0) >= 0;
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns,
+			Lists.<List<Event>>newArrayList(
+				Lists.newArrayList(startEvent1, startEvent2, middleEvent1, endEvent),
+				Lists.newArrayList(startEvent2, middleEvent1, endEvent),
+				Lists.newArrayList(startEvent1, startEvent2, middleEvent2, endEvent),
+				Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent2, endEvent),
+				Lists.newArrayList(startEvent2, startEvent3, middleEvent2, endEvent),
+				Lists.newArrayList(startEvent2, middleEvent2, endEvent),
+				Lists.newArrayList(startEvent3, middleEvent2, endEvent)
+			)
+		);
+	}
+}


[2/3] flink git commit: [FLINK-6137] Activate strict checkstyle for flink-cep

Posted by ch...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index d00bbb7..92b49d3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -18,34 +18,37 @@
 
 package org.apache.flink.cep.nfa;
 
-import com.google.common.collect.Lists;
-import com.google.common.primitives.Doubles;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.SubEvent;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.SimpleCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
-import org.junit.Assert;
+
+import com.google.common.collect.Lists;
 import org.junit.Test;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
 import static org.junit.Assert.assertEquals;
 
+/**
+ * General tests for {@link NFA} features. See also {@link IterativeConditionsITCase}, {@link NotPatternITCase},
+ * {@link SameElementITCase} for more specific tests.
+ */
 @SuppressWarnings("unchecked")
 public class NFAITCase extends TestLogger {
 
@@ -234,7 +237,7 @@ public class NFAITCase extends TestLogger {
 
 	/**
 	 * Tests that the NFA successfully filters out expired elements with respect to the window
-	 * length
+	 * length.
 	 */
 	@Test
 	public void testSimplePatternWithTimeWindowNFA() {
@@ -251,7 +254,6 @@ public class NFAITCase extends TestLogger {
 		events.add(new StreamRecord<>(endEvent = new Event(5, "end", 1.0), 11));
 		events.add(new StreamRecord<>(new Event(6, "end", 1.0), 13));
 
-
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7907391379273505897L;
 
@@ -373,7 +375,7 @@ public class NFAITCase extends TestLogger {
 		SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
 		SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
 		SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
-		Event endEvent=  new Event(46, "end", 1.0);
+		Event endEvent = new Event(46, "end", 1.0);
 
 		inputEvents.add(new StreamRecord<>(startEvent, 1));
 		inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
@@ -1418,7 +1420,7 @@ public class NFAITCase extends TestLogger {
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent,  end1),
+				Lists.newArrayList(startEvent, end1),
 				Lists.newArrayList(end1)
 		));
 	}
@@ -1458,9 +1460,9 @@ public class NFAITCase extends TestLogger {
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent,  middleEvent1, middleEvent2, middleEvent3),
-				Lists.newArrayList(startEvent,  middleEvent1, middleEvent2),
-				Lists.newArrayList(startEvent,  middleEvent1),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
+				Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+				Lists.newArrayList(startEvent, middleEvent1),
 				Lists.newArrayList(startEvent)
 		));
 	}
@@ -1499,10 +1501,10 @@ public class NFAITCase extends TestLogger {
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(middleEvent1,  middleEvent2, middleEvent3),
-			Lists.newArrayList(middleEvent1,  middleEvent2),
+			Lists.newArrayList(middleEvent1, middleEvent2, middleEvent3),
+			Lists.newArrayList(middleEvent1, middleEvent2),
 			Lists.newArrayList(middleEvent1),
-			Lists.newArrayList(middleEvent2,  middleEvent3),
+			Lists.newArrayList(middleEvent2, middleEvent3),
 			Lists.newArrayList(middleEvent2),
 			Lists.newArrayList(middleEvent3)
 		));
@@ -1539,7 +1541,7 @@ public class NFAITCase extends TestLogger {
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent,  middleEvent1),
+				Lists.newArrayList(startEvent, middleEvent1),
 				Lists.newArrayList(startEvent)
 		));
 	}
@@ -1579,9 +1581,9 @@ public class NFAITCase extends TestLogger {
 		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
 		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(startEvent,  middleEvent1, middleEvent2, middleEvent3),
-			Lists.newArrayList(startEvent,  middleEvent1, middleEvent2),
-			Lists.newArrayList(startEvent,  middleEvent1)
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+			Lists.newArrayList(startEvent, middleEvent1)
 		));
 	}
 
@@ -1905,12 +1907,12 @@ public class NFAITCase extends TestLogger {
 	///////////////////////////////         Consecutive           ////////////////////////////////////////
 
 	private static class ConsecutiveData {
-		static final Event startEvent = new Event(40, "c", 1.0);
-		static final Event middleEvent1 = new Event(41, "a", 2.0);
-		static final Event middleEvent2 = new Event(42, "a", 3.0);
-		static final Event middleEvent3 = new Event(43, "a", 4.0);
-		static final Event middleEvent4 = new Event(43, "a", 5.0);
-		static final Event end = new Event(44, "b", 5.0);
+		private static final Event startEvent = new Event(40, "c", 1.0);
+		private static final Event middleEvent1 = new Event(41, "a", 2.0);
+		private static final Event middleEvent2 = new Event(42, "a", 3.0);
+		private static final Event middleEvent3 = new Event(43, "a", 4.0);
+		private static final Event middleEvent4 = new Event(43, "a", 5.0);
+		private static final Event end = new Event(44, "b", 5.0);
 
 		private ConsecutiveData() {
 		}
@@ -2374,7 +2376,6 @@ public class NFAITCase extends TestLogger {
 		assertEquals(true, nfa.isEmpty());
 	}
 
-
 	@Test
 	public void testZeroOrMoreClearingBuffer() {
 		Event startEvent = new Event(40, "c", 1.0);
@@ -2418,385 +2419,6 @@ public class NFAITCase extends TestLogger {
 		assertEquals(true, nfa.isEmpty());
 	}
 
-
-	//////////////////////			Iterative BooleanConditions			/////////////////////////
-
-	private final Event startEvent1 = new Event(40, "start", 1.0);
-	private final Event startEvent2 = new Event(40, "start", 2.0);
-	private final Event startEvent3 = new Event(40, "start", 3.0);
-	private final Event startEvent4 = new Event(40, "start", 4.0);
-	private final SubEvent middleEvent1 = new SubEvent(41, "foo1", 1.0, 10);
-	private final SubEvent middleEvent2 = new SubEvent(42, "foo2", 2.0, 10);
-	private final SubEvent middleEvent3 = new SubEvent(43, "foo3", 3.0, 10);
-	private final SubEvent middleEvent4 = new SubEvent(43, "foo4", 1.0, 10);
-	private final Event nextOne = new Event(44, "next-one", 1.0);
-	private final Event endEvent = new Event(46, "end", 1.0);
-
-	@Test
-	public void testIterativeWithBranchingPatternEager() {
-		List<List<Event>> actual = testIterativeWithBranchingPattern(true);
-
-		compareMaps(actual,
-				Lists.<List<Event>>newArrayList(
-						Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent1),
-						Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
-						Lists.newArrayList(startEvent2, endEvent, middleEvent3)
-				)
-		);
-	}
-
-	@Test
-	public void testIterativeWithBranchingPatternCombinations() {
-		List<List<Event>> actual = testIterativeWithBranchingPattern(false);
-
-		compareMaps(actual,
-				Lists.<List<Event>>newArrayList(
-						Lists.newArrayList(startEvent1, endEvent, middleEvent1, middleEvent2, middleEvent4),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent2, middleEvent1),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent3, middleEvent1),
-						Lists.newArrayList(startEvent2, endEvent, middleEvent3, middleEvent4),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent4, middleEvent1),
-						Lists.newArrayList(startEvent1, endEvent, middleEvent1),
-						Lists.newArrayList(startEvent2, endEvent, middleEvent3)
-				)
-		);
-	}
-
-	private List<List<Event>> testIterativeWithBranchingPattern(boolean eager) {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		inputEvents.add(new StreamRecord<>(startEvent1, 1));
-		inputEvents.add(new StreamRecord<Event>(middleEvent1, 2));
-		inputEvents.add(new StreamRecord<Event>(middleEvent2, 3));
-		inputEvents.add(new StreamRecord<>(startEvent2, 4));
-		inputEvents.add(new StreamRecord<Event>(middleEvent3, 5));
-		inputEvents.add(new StreamRecord<Event>(middleEvent4, 5));
-		inputEvents.add(new StreamRecord<>(nextOne, 6));
-		inputEvents.add(new StreamRecord<>(endEvent, 8));
-
-		Pattern<Event, ?> pattern = eager
-				? Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 5726188262756267490L;
-
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("start");
-					}
-				})
-				.followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore()
-				.followedBy("end").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 7056763917392056548L;
-
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("end");
-					}
-				})
-				: Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 5726188262756267490L;
-
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("start");
-					}
-				})
-				.followedBy("middle").subtype(SubEvent.class).where(new MySubeventIterCondition()).oneOrMore().allowCombinations()
-				.followedBy("end").where(new SimpleCondition<Event>() {
-					private static final long serialVersionUID = 7056763917392056548L;
-
-					@Override
-					public boolean filter(Event value) throws Exception {
-						return value.getName().equals("end");
-					}
-				});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		return feedNFA(inputEvents, nfa);
-	}
-
-	private static class MySubeventIterCondition extends IterativeCondition<SubEvent> {
-
-		private static final long serialVersionUID = 6215754202506583964L;
-
-		@Override
-		public boolean filter (SubEvent value, Context < SubEvent > ctx) throws Exception {
-			if (!value.getName().startsWith("foo")) {
-				return false;
-			}
-
-			double sum = 0.0;
-			for (Event event : ctx.getEventsForPattern("middle")) {
-				sum += event.getPrice();
-			}
-			sum += value.getPrice();
-			return Double.compare(sum, 5.0) < 0;
-		}
-	}
-
-	@Test
-	public void testIterativeWithLoopingStartingEager() {
-		List<List<Event>> actual = testIterativeWithLoopingStarting(true);
-
-		compareMaps(actual,
-				Lists.<List<Event>>newArrayList(
-						Lists.newArrayList(startEvent1, startEvent2, endEvent),
-						Lists.newArrayList(startEvent1, endEvent),
-						Lists.newArrayList(startEvent2, endEvent),
-						Lists.newArrayList(startEvent3, endEvent),
-						Lists.newArrayList(endEvent)
-				)
-		);
-	}
-
-	@Test
-	public void testIterativeWithLoopingStartingCombination() {
-		List<List<Event>> actual = testIterativeWithLoopingStarting(false);
-
-		compareMaps(actual,
-				Lists.<List<Event>>newArrayList(
-						Lists.newArrayList(startEvent1, startEvent2, endEvent),
-						Lists.newArrayList(startEvent1, startEvent3, endEvent),
-						Lists.newArrayList(startEvent1, endEvent),
-						Lists.newArrayList(startEvent2, endEvent),
-						Lists.newArrayList(startEvent3, endEvent),
-						Lists.newArrayList(endEvent)
-				)
-		);
-	}
-
-	private List<List<Event>> testIterativeWithLoopingStarting(boolean eager) {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
-		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
-		inputEvents.add(new StreamRecord<>(startEvent3, 3L));
-		inputEvents.add(new StreamRecord<>(endEvent, 4L));
-
-		// for now, a pattern inherits its continuity property from the followedBy() or next(), and the default
-		// behavior (which is the one applied in the case that the pattern graph starts with such a pattern)
-		// of a looping pattern is with relaxed continuity (as in followedBy).
-
-		Pattern<Event, ?> pattern = eager
-				? Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().optional()
-					.followedBy("end").where(new SimpleCondition<Event>() {
-						private static final long serialVersionUID = 7056763917392056548L;
-
-						@Override
-						public boolean filter(Event value) throws Exception {
-							return value.getName().equals("end");
-						}
-					})
-				: Pattern.<Event>begin("start").where(new MyEventIterCondition()).oneOrMore().allowCombinations().optional()
-					.followedBy("end").where(new SimpleCondition<Event>() {
-						private static final long serialVersionUID = 7056763917392056548L;
-
-						@Override
-						public boolean filter(Event value) throws Exception {
-							return value.getName().equals("end");
-						}
-					});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		return feedNFA(inputEvents, nfa);
-	}
-
-	private static class MyEventIterCondition extends IterativeCondition<Event> {
-
-		private static final long serialVersionUID = 6215754202506583964L;
-
-		@Override
-		public boolean filter(Event value, Context<Event> ctx) throws Exception {
-			if (!value.getName().equals("start")) {
-				return false;
-			}
-
-			double sum = 0.0;
-			for (Event event : ctx.getEventsForPattern("start")) {
-				sum += event.getPrice();
-			}
-			sum += value.getPrice();
-			return Double.compare(sum, 5.0) < 0;
-		}
-	}
-
-	@Test
-	public void testIterativeWithPrevPatternDependency() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
-		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
-		inputEvents.add(new StreamRecord<>(endEvent, 4L));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 6215754202506583964L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("start");
-			}
-		}).oneOrMore().followedBy("end").where(new IterativeCondition<Event>() {
-			private static final long serialVersionUID = 7056763917392056548L;
-
-			@Override
-			public boolean filter(Event value, Context<Event> ctx) throws Exception {
-				if (!value.getName().equals("end")) {
-					return false;
-				}
-
-				double sum = 0.0;
-				for (Event event : ctx.getEventsForPattern("start")) {
-					sum += event.getPrice();
-				}
-				return Double.compare(sum, 2.0) >= 0;
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns,
-				Lists.<List<Event>>newArrayList(
-						Lists.newArrayList(startEvent1, startEvent2, endEvent),
-						Lists.newArrayList(startEvent2, endEvent)
-				)
-		);
-	}
-
-	@Test
-	public void testIterativeWithABACPattern() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		inputEvents.add(new StreamRecord<>(startEvent1, 1L)); //1
-		inputEvents.add(new StreamRecord<Event>(middleEvent1, 2L)); //1
-
-		inputEvents.add(new StreamRecord<>(startEvent2, 2L)); //2
-		inputEvents.add(new StreamRecord<>(startEvent3, 2L)); //3
-		inputEvents.add(new StreamRecord<Event>(middleEvent2, 2L)); //2
-
-		inputEvents.add(new StreamRecord<>(startEvent4, 2L)); //4
-		inputEvents.add(new StreamRecord<Event>(middleEvent3, 2L)); //3
-		inputEvents.add(new StreamRecord<Event>(middleEvent4, 2L)); //1
-		inputEvents.add(new StreamRecord<>(endEvent, 4L));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 6215754202506583964L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("start");
-			}
-		}).followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
-			private static final long serialVersionUID = 2178338526904474690L;
-
-			@Override
-			public boolean filter(SubEvent value) throws Exception {
-				return value.getName().startsWith("foo");
-			}
-		}).followedBy("middle2").where(new IterativeCondition<Event>() {
-			private static final long serialVersionUID = -1223388426808292695L;
-
-			@Override
-			public boolean filter(Event value, Context<Event> ctx) throws Exception {
-				if (!value.getName().equals("start")) {
-					return false;
-				}
-
-				double sum = 0.0;
-				for (Event e: ctx.getEventsForPattern("middle2")) {
-					sum += e.getPrice();
-				}
-				sum += value.getPrice();
-				return Double.compare(sum, 5.0) <= 0;
-			}
-		}).oneOrMore().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 562590474115118323L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("end");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns,
-				Lists.<List<Event>>newArrayList(
-						Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent1, endEvent),
-						Lists.newArrayList(startEvent1, middleEvent1, startEvent2, endEvent),
-						Lists.newArrayList(startEvent1, middleEvent2, startEvent4, endEvent),
-						Lists.newArrayList(startEvent2, middleEvent2, startEvent4, endEvent),
-						Lists.newArrayList(startEvent3, middleEvent2, startEvent4, endEvent)
-				)
-		);
-	}
-
-	@Test
-	public void testIterativeWithPrevPatternDependencyAfterBranching() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		inputEvents.add(new StreamRecord<>(startEvent1, 1L));
-		inputEvents.add(new StreamRecord<>(startEvent2, 2L));
-		inputEvents.add(new StreamRecord<Event>(middleEvent1, 4L));
-		inputEvents.add(new StreamRecord<>(startEvent3, 5L));
-		inputEvents.add(new StreamRecord<Event>(middleEvent2, 6L));
-		inputEvents.add(new StreamRecord<>(endEvent, 7L));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 6215754202506583964L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("start");
-			}
-		}).oneOrMore().followedByAny("middle1").subtype(SubEvent.class).where(new SimpleCondition<SubEvent>() {
-			private static final long serialVersionUID = 2178338526904474690L;
-
-			@Override
-			public boolean filter(SubEvent value) throws Exception {
-				return value.getName().startsWith("foo");
-			}
-		}).followedByAny("end").where(new IterativeCondition<Event>() {
-			private static final long serialVersionUID = 7056763917392056548L;
-
-			@Override
-			public boolean filter(Event value, Context<Event> ctx) throws Exception {
-				if (!value.getName().equals("end")) {
-					return false;
-				}
-
-				double sum = 0.0;
-				for (Event event : ctx.getEventsForPattern("start")) {
-					sum += event.getPrice();
-				}
-				return Double.compare(sum, 2.0) >= 0;
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns,
-				Lists.<List<Event>>newArrayList(
-						Lists.newArrayList(startEvent1, startEvent2, middleEvent1, endEvent),
-						Lists.newArrayList(startEvent2, middleEvent1, endEvent),
-						Lists.newArrayList(startEvent1, startEvent2, middleEvent2, endEvent),
-						Lists.newArrayList(startEvent1, startEvent2, startEvent3, middleEvent2, endEvent),
-						Lists.newArrayList(startEvent2, startEvent3, middleEvent2, endEvent),
-						Lists.newArrayList(startEvent2, middleEvent2, endEvent),
-						Lists.newArrayList(startEvent3, middleEvent2, endEvent)
-				)
-		);
-	}
-
-
 	///////////////////////////////////////   Skip till next     /////////////////////////////
 
 	@Test
@@ -2809,7 +2431,7 @@ public class NFAITCase extends TestLogger {
 		SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
 		SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
 		SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
-		Event endEvent=  new Event(46, "end", 1.0);
+		Event endEvent = new Event(46, "end", 1.0);
 
 		inputEvents.add(new StreamRecord<>(startEvent, 1));
 		inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
@@ -2868,7 +2490,7 @@ public class NFAITCase extends TestLogger {
 		SubEvent middleEvent3 = new SubEvent(43, "foo3", 1.0, 10.0);
 		SubEvent nextOne1 = new SubEvent(44, "next-one", 1.0, 2.0);
 		SubEvent nextOne2 = new SubEvent(45, "next-one", 1.0, 2.0);
-		Event endEvent=  new Event(46, "end", 1.0);
+		Event endEvent = new Event(46, "end", 1.0);
 
 		inputEvents.add(new StreamRecord<>(startEvent, 1));
 		inputEvents.add(new StreamRecord<Event>(middleEvent1, 3));
@@ -2919,1130 +2541,76 @@ public class NFAITCase extends TestLogger {
 		));
 	}
 
-
-	/////////////////////////////////////////       Not pattern    /////////////////////////////////////////////////
-
 	@Test
-	public void testNotNext() {
+	public void testMultipleTakesVersionCollision() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 
-		Event a1 = new Event(40, "a", 1.0);
-		Event c1 = new Event(41, "c", 2.0);
-		Event b1 = new Event(42, "b", 3.0);
-		Event c2 = new Event(43, "c", 4.0);
-		Event d = new Event(43, "d", 4.0);
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(41, "a", 3.0);
+		Event middleEvent3 = new Event(41, "a", 4.0);
+		Event middleEvent4 = new Event(41, "a", 5.0);
+		Event middleEvent5 = new Event(41, "a", 6.0);
+		Event end = new Event(44, "b", 5.0);
 
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(c1, 2));
-		inputEvents.add(new StreamRecord<>(b1, 3));
-		inputEvents.add(new StreamRecord<>(c2, 4));
-		inputEvents.add(new StreamRecord<>(d, 5));
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(middleEvent4, 6));
+		inputEvents.add(new StreamRecord<>(middleEvent5, 7));
+		inputEvents.add(new StreamRecord<>(end, 10));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5167288560432018992L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).notNext("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 2242479288129905510L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 1404509325548220892L;
+			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("c");
 			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -8907427230007830915L;
+		}).followedBy("middle1").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
+				return value.getName().equals("a");
 			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(a1, c1, d),
-			Lists.newArrayList(a1, c2, d)
-		));
-	}
-
-	@Test
-	public void testNotNextNoMatches() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event b1 = new Event(42, "b", 3.0);
-		Event c1 = new Event(41, "c", 2.0);
-		Event c2 = new Event(43, "c", 4.0);
-		Event d = new Event(43, "d", 4.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(b1, 2));
-		inputEvents.add(new StreamRecord<>(c1, 3));
-		inputEvents.add(new StreamRecord<>(c2, 4));
-		inputEvents.add(new StreamRecord<>(d, 5));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -339500190577666439L;
+		}).oneOrMore().allowCombinations().followedBy("middle2").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("a");
 			}
-		}).notNext("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -6913980632538046451L;
+		}).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("b");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 3332196998905139891L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 2086563479959018387L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
 		});
 
 		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
 
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		assertEquals(0, matches.size());
-	}
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
 
-	@Test
-	public void testNotNextNoMatchesAtTheEnd() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
 
-		Event a1 = new Event(40, "a", 1.0);
-		Event c1 = new Event(41, "c", 2.0);
-		Event c2 = new Event(43, "c", 4.0);
-		Event d = new Event(43, "d", 4.0);
-		Event b1 = new Event(42, "b", 3.0);
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
 
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(c1, 2));
-		inputEvents.add(new StreamRecord<>(c2, 3));
-		inputEvents.add(new StreamRecord<>(d, 4));
-		inputEvents.add(new StreamRecord<>(b1, 5));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 1672995058886176627L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 6003621617520261554L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedByAny("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 887700237024758417L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
-		}).notNext("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5239529076086933032L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		assertEquals(0, matches.size());
-	}
-
-	@Test
-	public void testNotFollowedBy() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event c1 = new Event(41, "c", 2.0);
-		Event b1 = new Event(42, "b", 3.0);
-		Event c2 = new Event(43, "c", 4.0);
-		Event d = new Event(43, "d", 4.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(c1, 2));
-		inputEvents.add(new StreamRecord<>(b1, 3));
-		inputEvents.add(new StreamRecord<>(c2, 4));
-		inputEvents.add(new StreamRecord<>(d, 5));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -2641662468313191976L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -3632144132379494778L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 3818766882138348167L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 2033204730795451288L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches,Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(a1, c1, d)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByBeforeOptional() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event c1 = new Event(41, "c", 2.0);
-		Event b1 = new Event(42, "b", 3.0);
-		Event c2 = new Event(43, "c", 4.0);
-		Event d = new Event(43, "d", 4.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(c1, 2));
-		inputEvents.add(new StreamRecord<>(b1, 3));
-		inputEvents.add(new StreamRecord<>(c2, 4));
-		inputEvents.add(new StreamRecord<>(d, 5));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -2454396370205097543L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 2749547391611263290L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -4989511337298217255L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -8466223836652936608L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches,Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(a1, c1, d)
-		));
-	}
-
-	@Test
-	public void testTimesWithNotFollowedBy() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event b1 = new Event(41, "b", 2.0);
-		Event c = new Event(42, "c", 3.0);
-		Event b2 = new Event(43, "b", 4.0);
-		Event d = new Event(43, "d", 4.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(b1, 2));
-		inputEvents.add(new StreamRecord<>(c, 3));
-		inputEvents.add(new StreamRecord<>(b2, 4));
-		inputEvents.add(new StreamRecord<>(d, 5));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -2568839911852184515L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -3632232424064269636L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 3685596793523534611L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 1960758663575587243L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches,Lists.<List<Event>>newArrayList());
-	}
-
-	@Test
-	public void testIgnoreStateOfTimesWithNotFollowedBy() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event e = new Event(41, "e", 2.0);
-		Event c1 = new Event(42, "c", 3.0);
-		Event b1 = new Event(43, "b", 4.0);
-		Event c2 = new Event(44, "c", 5.0);
-		Event d1 = new Event(45, "d", 6.0);
-		Event d2 = new Event(46, "d", 7.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(d1, 2));
-		inputEvents.add(new StreamRecord<>(e, 1));
-		inputEvents.add(new StreamRecord<>(b1, 3));
-		inputEvents.add(new StreamRecord<>(c1, 2));
-		inputEvents.add(new StreamRecord<>(c2, 4));
-		inputEvents.add(new StreamRecord<>(d2, 5));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 2814850350025111940L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 4988756153568853834L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -225909103322018778L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -924294627956373696L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(a1, d1)
-		));
-	}
-
-	@Test
-	public void testTimesWithNotFollowedByAfter() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event e = new Event(41, "e", 2.0);
-		Event c1 = new Event(42, "c", 3.0);
-		Event b1 = new Event(43, "b", 4.0);
-		Event b2 = new Event(44, "b", 5.0);
-		Event d1 = new Event(46, "d", 7.0);
-		Event d2 = new Event(47, "d", 8.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(d1, 2));
-		inputEvents.add(new StreamRecord<>(e, 1));
-		inputEvents.add(new StreamRecord<>(b1, 3));
-		inputEvents.add(new StreamRecord<>(b2, 3));
-		inputEvents.add(new StreamRecord<>(c1, 2));
-		inputEvents.add(new StreamRecord<>(d2, 5));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 6193105689601702341L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5195859580923169111L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).times(2).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 4973027956103783831L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 2724622546678984894L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches, Lists.<List<Event>>newArrayList());
-	}
-
-	@Test
-	public void testNotFollowedByBeforeOptionalAtTheEnd() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event c1 = new Event(41, "c", 2.0);
-		Event b1 = new Event(42, "b", 3.0);
-		Event c2 = new Event(43, "c", 4.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(c1, 2));
-		inputEvents.add(new StreamRecord<>(b1, 3));
-		inputEvents.add(new StreamRecord<>(c2, 4));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -4289351792573443294L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -4989574608417523507L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).followedByAny("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -5940131818629290579L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).optional();
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches,Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(a1, c1),
-			Lists.newArrayList(a1)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByBeforeOptionalTimes() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event c1 = new Event(41, "c", 2.0);
-		Event b1 = new Event(42, "b", 3.0);
-		Event c2 = new Event(43, "c", 4.0);
-		Event d = new Event(43, "d", 4.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(c1, 2));
-		inputEvents.add(new StreamRecord<>(b1, 3));
-		inputEvents.add(new StreamRecord<>(c2, 4));
-		inputEvents.add(new StreamRecord<>(d, 5));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -7885381452276160322L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 3471511260235826653L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 9073793782452363833L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).times(2).optional().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 7972902718259767076L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches,Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(a1, c1, c2, d)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByWithBranchingAtStart() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event a1 = new Event(40, "a", 1.0);
-		Event b1 = new Event(42, "b", 3.0);
-		Event c1 = new Event(41, "c", 2.0);
-		Event a2 = new Event(41, "a", 4.0);
-		Event c2 = new Event(43, "c", 5.0);
-		Event d = new Event(43, "d", 6.0);
-
-		inputEvents.add(new StreamRecord<>(a1, 1));
-		inputEvents.add(new StreamRecord<>(b1, 2));
-		inputEvents.add(new StreamRecord<>(c1, 3));
-		inputEvents.add(new StreamRecord<>(a2, 4));
-		inputEvents.add(new StreamRecord<>(c2, 5));
-		inputEvents.add(new StreamRecord<>(d, 6));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -7866220136345465444L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).notFollowedBy("notPattern").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 4957837489028234932L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5569569968862808007L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = -8579678167937416269L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("d");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> matches = feedNFA(inputEvents, nfa);
-
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(a2, c2, d)
-		));
-	}
-
-	private static class NotFollowByData {
-		static final Event a1 = new Event(40, "a", 1.0);
-		static final Event b1 = new Event(41, "b", 2.0);
-		static final Event b2 = new Event(42, "b", 3.0);
-		static final Event b3 = new Event(42, "b", 4.0);
-		static final Event c1 = new Event(43, "c", 5.0);
-		static final Event b4 = new Event(42, "b", 6.0);
-		static final Event b5 = new Event(42, "b", 7.0);
-		static final Event b6 = new Event(42, "b", 8.0);
-		static final Event d1 = new Event(43, "d", 9.0);
-
-		private NotFollowByData() {
-		}
-	}
-
-	@Test
-	public void testNotNextAfterOneOrMoreSkipTillNext() {
-		final List<List<Event>> matches = testNotNextAfterOneOrMore(false);
-		assertEquals(0, matches.size());
-	}
-
-	@Test
-	public void testNotNextAfterOneOrMoreSkipTillAny() {
-		final List<List<Event>> matches = testNotNextAfterOneOrMore(true);
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b2, NotFollowByData.d1)
-		));
-	}
-
-	private List<List<Event>> testNotNextAfterOneOrMore(boolean allMatches) {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		int i = 0;
-		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i++));
-
-		Pattern<Event, ?> pattern = Pattern
-			.<Event>begin("a").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("a");
-				}
-			});
-
-		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*")).where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).oneOrMore()
-			.notNext("not c").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("c");
-				}
-			})
-			.followedBy("d").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("d");
-				}
-			});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		return feedNFA(inputEvents, nfa);
-	}
-
-	@Test
-	public void testNotFollowedByNextAfterOneOrMoreEager() {
-		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, false);
-		assertEquals(0, matches.size());
-	}
-
-	@Test
-	public void testNotFollowedByAnyAfterOneOrMoreEager() {
-		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(true, true);
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b6, NotFollowByData.d1)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByNextAfterOneOrMoreCombinations() {
-		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, false);
-		assertEquals(0, matches.size());
-	}
-
-	@Test
-	public void testNotFollowedByAnyAfterOneOrMoreCombinations() {
-		final List<List<Event>> matches = testNotFollowedByAfterOneOrMore(false, true);
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b6, NotFollowByData.d1)
-		));
-	}
-
-	private List<List<Event>> testNotFollowedByAfterOneOrMore(boolean eager, boolean allMatches) {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		int i = 0;
-		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b2, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b3, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
-
-		Pattern<Event, ?> pattern = Pattern
-			.<Event>begin("a").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("a");
-				}
-			});
-
-		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
-			.where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("b");
-				}
-			});
-
-		pattern = (eager ? pattern.oneOrMore() : pattern.oneOrMore().allowCombinations())
-			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("c");
-				}
-			})
-			.followedBy("d").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("d");
-				}
-			});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		return feedNFA(inputEvents, nfa);
-	}
-
-	@Test
-	public void testNotFollowedByAnyBeforeOneOrMoreEager() {
-		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, true);
-
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByAnyBeforeOneOrMoreCombinations() {
-		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, true);
-
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByBeforeOneOrMoreEager() {
-		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(true, false);
-
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByBeforeOneOrMoreCombinations() {
-		final List<List<Event>> matches = testNotFollowedByBeforeOneOrMore(false, false);
-
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
-		));
-	}
-
-	private List<List<Event>> testNotFollowedByBeforeOneOrMore(boolean eager, boolean allMatches) {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		int i = 0;
-		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
-
-		Pattern<Event, ?> pattern = Pattern
-			.<Event>begin("a").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("a");
-				}
-			})
-			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("c");
-				}
-			});
-
-		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
-			.where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("b");
-				}
-			}).oneOrMore();
-
-		pattern = (eager ? pattern : pattern.allowCombinations())
-			.followedBy("d").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("d");
-				}
-			});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		return feedNFA(inputEvents, nfa);
-	}
-
-	@Test
-	public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillNext() {
-		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, false);
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillNext() {
-		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, false);
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByBeforeZeroOrMoreEagerSkipTillAny() {
-		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(true, true);
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1)
-		));
-	}
-
-	@Test
-	public void testNotFollowedByBeforeZeroOrMoreCombinationsSkipTillAny() {
-		final List<List<Event>> matches = testNotFollowedByBeforeZeroOrMore(false, true);
-		compareMaps(matches, Lists.<List<Event>>newArrayList(
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b4, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.b6, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b5, NotFollowByData.d1),
-			Lists.newArrayList(NotFollowByData.a1, NotFollowByData.b1, NotFollowByData.b6, NotFollowByData.d1)
-		));
-	}
-
-	private List<List<Event>> testNotFollowedByBeforeZeroOrMore(boolean eager, boolean allMatches) {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		int i = 0;
-		inputEvents.add(new StreamRecord<>(NotFollowByData.a1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.c1, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b4, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b5, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.b6, i++));
-		inputEvents.add(new StreamRecord<>(NotFollowByData.d1, i));
-
-		Pattern<Event, ?> pattern = Pattern
-			.<Event>begin("a").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("a");
-				}
-			})
-			.notFollowedBy("not c").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("c");
-				}
-			});
-
-		pattern = (allMatches ? pattern.followedByAny("b*") : pattern.followedBy("b*"))
-			.where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("b");
-				}
-			}).oneOrMore().optional();
-
-		pattern = (eager ? pattern : pattern.allowCombinations())
-			.followedBy("d").where(new SimpleCondition<Event>() {
-				private static final long serialVersionUID = 5726188262756267490L;
-
-				@Override
-				public boolean filter(Event value) throws Exception {
-					return value.getName().equals("d");
-				}
-			});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		return feedNFA(inputEvents, nfa);
-	}
-
-	@Test
-	public void testEagerZeroOrMoreSameElement() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event startEvent = new Event(40, "c", 1.0);
-		Event middleEvent1 = new Event(41, "a", 2.0);
-		Event middleEvent2 = new Event(42, "a", 3.0);
-		Event middleEvent3 = new Event(43, "a", 4.0);
-		Event end1 = new Event(44, "b", 5.0);
-
-		inputEvents.add(new StreamRecord<>(startEvent, 1));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
-		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
-		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
-		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
-		inputEvents.add(new StreamRecord<>(end1, 7));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).oneOrMore().optional().followedBy("end1").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, middleEvent2, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, middleEvent1, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
-				Lists.newArrayList(startEvent, middleEvent1, end1),
-				Lists.newArrayList(startEvent, end1)
-		));
-	}
-
-	@Test
-	public void testMultipleTakesVersionCollision() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event startEvent = new Event(40, "c", 1.0);
-		Event middleEvent1 = new Event(41, "a", 2.0);
-		Event middleEvent2 = new Event(41, "a", 3.0);
-		Event middleEvent3 = new Event(41, "a", 4.0);
-		Event middleEvent4 = new Event(41, "a", 5.0);
-		Event middleEvent5 = new Event(41, "a", 6.0);
-		Event end = new Event(44, "b", 5.0);
-
-		inputEvents.add(new StreamRecord<>(startEvent, 1));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
-		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
-		inputEvents.add(new StreamRecord<>(middleEvent4, 6));
-		inputEvents.add(new StreamRecord<>(middleEvent5, 7));
-		inputEvents.add(new StreamRecord<>(end, 10));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("middle1").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).oneOrMore().allowCombinations().followedBy("middle2").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).oneOrMore().allowCombinations().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, middleEvent5, end),
-
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end),
-			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent4, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent5, end),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent4, middleEvent5, end),
 
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end),
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent4, end),
@@ -4054,386 +2622,4 @@ public class NFAITCase extends TestLogger {
 			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end)
 			));
 	}
-
-	@Test
-	public void testZeroOrMoreSameElement() {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event startEvent = new Event(40, "c", 1.0);
-		Event middleEvent1 = new Event(41, "a", 2.0);
-		Event middleEvent1a = new Event(41, "a", 2.0);
-		Event middleEvent2 = new Event(42, "a", 3.0);
-		Event middleEvent3 = new Event(43, "a", 4.0);
-		Event middleEvent3a = new Event(43, "a", 4.0);
-		Event end1 = new Event(44, "b", 5.0);
-
-		inputEvents.add(new StreamRecord<>(startEvent, 1));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
-		inputEvents.add(new StreamRecord<>(new Event(50, "d", 6.0), 5));
-		inputEvents.add(new StreamRecord<>(middleEvent3, 6));
-		inputEvents.add(new StreamRecord<>(middleEvent3a, 6));
-		inputEvents.add(new StreamRecord<>(end1, 7));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).oneOrMore().optional().allowCombinations().followedByAny("end1").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
-
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, middleEvent3a, end1),
-
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent2, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent3, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent2, middleEvent3, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, middleEvent3a, end1),
-
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent2, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent2, end1),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent2, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent2, middleEvent3a, end1),
-				Lists.newArrayList(startEvent, middleEvent3, middleEvent3a, end1),
-
-				Lists.newArrayList(startEvent, middleEvent1, end1),
-				Lists.newArrayList(startEvent, middleEvent1a, end1),
-				Lists.newArrayList(startEvent, middleEvent2, end1),
-				Lists.newArrayList(startEvent, middleEvent3, end1),
-				Lists.newArrayList(startEvent, middleEvent3a, end1),
-
-				Lists.newArrayList(startEvent, end1)
-		));
-	}
-
-	@Test
-	public void testSimplePatternWSameElement() throws Exception {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event startEvent = new Event(40, "c", 1.0);
-		Event middleEvent1 = new Event(41, "a", 2.0);
-		Event end1 = new Event(44, "b", 5.0);
-
-		inputEvents.add(new StreamRecord<>(startEvent, 1));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(end1, 7));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).followedBy("end1").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent, middleEvent1, end1),
-				Lists.newArrayList(startEvent, middleEvent1, end1)
-		));
-	}
-
-	@Test
-	public void testIterativeConditionWSameElement() throws Exception {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event startEvent = new Event(40, "c", 1.0);
-		Event middleEvent1 = new Event(41, "a", 2.0);
-		Event middleEvent1a = new Event(41, "a", 2.0);
-		Event middleEvent1b = new Event(41, "a", 2.0);
-		final Event end = new Event(44, "b", 5.0);
-
-		inputEvents.add(new StreamRecord<>(startEvent, 1));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
-		inputEvents.add(new StreamRecord<>(end, 7));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).oneOrMore().optional().allowCombinations().followedBy("end").where(new IterativeCondition<Event>() {
-
-			private static final long serialVersionUID = -5566639743229703237L;
-
-			@Override
-			public boolean filter(Event value, Context<Event> ctx) throws Exception {
-				double sum = 0.0;
-				for (Event event: ctx.getEventsForPattern("middle")) {
-					sum += event.getPrice();
-				}
-				return Double.compare(sum, 4.0) == 0;
-			}
-
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, end),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b, end)
-		));
-	}
-
-	@Test
-	public void testEndWLoopingWSameElement() throws Exception {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event startEvent = new Event(40, "c", 1.0);
-		Event middleEvent1 = new Event(41, "a", 2.0);
-		Event middleEvent1a = new Event(41, "a", 2.0);
-		Event middleEvent1b = new Event(41, "a", 2.0);
-		final Event end = new Event(44, "b", 5.0);
-
-		inputEvents.add(new StreamRecord<>(startEvent, 1));
-		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent1a, 3));
-		inputEvents.add(new StreamRecord<>(middleEvent1b, 3));
-		inputEvents.add(new StreamRecord<>(end, 7));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedByAny("middle").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).oneOrMore().optional();
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent),
-				Lists.newArrayList(startEvent, middleEvent1),
-				Lists.newArrayList(startEvent, middleEvent1a),
-				Lists.newArrayList(startEvent, middleEvent1b),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a),
-				Lists.newArrayList(startEvent, middleEvent1a, middleEvent1b),
-				Lists.newArrayList(startEvent, middleEvent1, middleEvent1a, middleEvent1b)
-		));
-	}
-
-	@Test
-	public void testRepeatingPatternWSameElement() throws Exception {
-		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
-
-		Event startEvent = new Event(40, "c", 1.0);
-		Event middle1Event1 = new Event(40, "a", 2.0);
-		Event middle1Event2 = new Event(40, "a", 3.0);
-		Event middle1Event3 = new Event(40, "a", 4.0);
-		Event middle2Event1 = new Event(40, "b", 5.0);
-
-		inputEvents.add(new StreamRecord<>(startEvent, 1));
-		inputEvents.add(new StreamRecord<>(middle1Event1, 3));
-		inputEvents.add(new StreamRecord<>(middle1Event1, 3));
-		inputEvents.add(new StreamRecord<>(middle1Event2, 3));
-		inputEvents.add(new StreamRecord<>(new Event(40, "d", 6.0), 5));
-		inputEvents.add(new StreamRecord<>(middle2Event1, 6));
-		inputEvents.add(new StreamRecord<>(middle1Event3, 7));
-
-		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("c");
-			}
-		}).followedBy("middle1").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		}).oneOrMore().optional().followedBy("middle2").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("b");
-			}
-		}).optional().followedBy("end").where(new SimpleCondition<Event>() {
-			private static final long serialVersionUID = 5726188262756267490L;
-
-			@Override
-			public boolean filter(Event value) throws Exception {
-				return value.getName().equals("a");
-			}
-		});
-
-		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
-
-		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
-
-		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
-				Lists.newArrayList(startEvent, middle1Event1),
-
-				Lists.newArrayList(startEvent, middle1Event1, middle1Event1),
-				Lists.newArrayList(startEvent, middle2Event1, middle1Event3),
-
-				Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2),
-				Lists.newArrayList(startEvent, middle1Event1, middle2Event1, middle1Event3),
-
-				Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle1Event3),
-				Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle2Event1, middle1Event3),
-
-				Lists.newArrayList(startEvent, middle1Event1, middle1Event1, middle1Event2, middle2Event1, middle1Event3)
-		));
-	}
-
-	/////////////////////////////////////////       Utility        /////////////////////////////////////////////////
-
-	private List<List<Event>> feedNFA(List<StreamRecord<Event>> inputEvents, NFA<Event> nfa) {
-		List<List<Event>> resultingPatterns = new ArrayList<>();
-
-		for (StreamRecord<Event> inputEvent : inputEvents) {
-			Collection<Map<String, List<Event>>> patterns = nfa.process(
-				inputEvent.getValue(),
-				inputEvent.getTimestamp()).f0;
-
-			for (Map<String, List<Event>> p: patterns) {
-				List<Event> res = new ArrayList<>();
-				for (List<Event> le: p.values()) {
-					res.addAll(le);
-				}
-				resultingPatterns.add(res);
-			}
-		}
-		return resultingPatterns;
-	}
-
-	private void compareMaps(List<List<Event>> actual, List<List<Event>> expected) {
-		Assert.assertEquals(expected.size(), actual.size());
-
-		for (List<Event> p: actual) {
-			Collections.sort(p, new EventComparator());
-		}
-
-		for (List<Event> p: expected) {
-			Collections.sort(p, new EventComparator());
-		}
-
-		Collections.sort(actual, new ListEventComparator());
-		Collections.sort(expected, new ListEventComparator());
-		Assert.assertArrayEquals(expected.toArray(), actual.toArray());
-	}
-
-	private class ListEventComparator implements Comparator<List<Event>> {
-
-		@Override
-		public int compare(List<Event> o1, List<Event> o2) {
-			int sizeComp = Integer.compare(o1.size(), o2.size());
-			if (sizeComp == 0) {
-				EventComparator comp = new EventComparator();
-				for (int i = 0; i < o1.size(); i++) {
-					int eventComp = comp.compare(o1.get(i), o2.get(i));
-					if (eventComp != 0) {
-						return eventComp;
-					}
-				}
-				return 0;
-			} else {
-				return sizeComp;
-			}
-		}
-	}
-
-	private class EventComparator implements Comparator<Event> {
-
-		@Override
-		public int compare(Event o1, Event o2) {
-			int nameComp = o1.getName().compareTo(o2.getName());
-			int priceComp = Doubles.compare(o1.getPrice(), o2.getPrice());
-			int idComp = Integer.compare(o1.getId(), o2.getId());
-			if (nameComp == 0) {
-				if (priceComp == 0) {
-					return idComp;
-				} else {
-					return priceComp;
-				}
-			} else {
-				return nameComp;
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c9e574bf/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
index 2619764..2586342 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFATest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.cep.nfa;
 
-import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
@@ -29,6 +28,8 @@ import org.apache.flink.core.memory.DataInputViewStreamWrapper;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.TestLogger;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
@@ -44,6 +45,9 @@ import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
 
+/**
+ * Tests for {@link NFA}.
+ */
 public class NFATest extends TestLogger {
 	@Test
 	public void testSimpleNFA() {
@@ -147,7 +151,7 @@ public class NFATest extends TestLogger {
 
 	/**
 	 * Tests that pruning shared buffer elements and computations state use the same window border
-	 * semantics (left side inclusive and right side exclusive)
+	 * semantics (left side inclusive and right side exclusive).
 	 */
 	@Test
 	public void testTimeoutWindowPruningWindowBorders() {