You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/05/03 12:10:09 UTC

[02/50] [abbrv] flink git commit: [FLINK-6208] [cep] Implement skip till next match strategy

[FLINK-6208] [cep] Implement skip till next match strategy


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c35dc0e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c35dc0e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c35dc0e

Branch: refs/heads/table-retraction
Commit: 7c35dc0e40a96f6b8d367b9a80a19e9a22525043
Parents: b527582
Author: Dawid Wysakowicz <da...@getindata.com>
Authored: Wed Apr 26 11:57:53 2017 +0200
Committer: kl0u <kk...@gmail.com>
Committed: Fri Apr 28 14:28:37 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  19 +-
 .../cep/scala/pattern/FollowedByPattern.scala   |  44 --
 .../flink/cep/scala/pattern/Pattern.scala       |  31 +-
 .../flink/cep/scala/pattern/package.scala       |   3 +-
 .../flink/cep/scala/pattern/PatternTest.scala   |  52 +-
 .../flink/cep/nfa/compiler/NFACompiler.java     | 192 ++---
 .../flink/cep/pattern/FollowedByPattern.java    |  33 -
 .../org/apache/flink/cep/pattern/Pattern.java   |  42 +-
 .../apache/flink/cep/pattern/Quantifier.java    |  82 +-
 .../java/org/apache/flink/cep/CEPITCase.java    |  30 +-
 .../org/apache/flink/cep/nfa/NFAITCase.java     | 760 +++++++++----------
 .../flink/cep/nfa/compiler/NFACompilerTest.java |  39 -
 .../flink/cep/operator/CEPOperatorTest.java     |   5 +-
 .../apache/flink/cep/pattern/PatternTest.java   |   9 +-
 14 files changed, 656 insertions(+), 685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 15afdf5..b379615 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -268,7 +268,7 @@ val strictNext: Pattern[Event, _] = start.next("middle")
 </div>
 
 Non-strict contiguity means that other events are allowed to occur in-between two matching events.
-A non-strict contiguity pattern state can be created via the `followedBy` method.
+A non-strict contiguity pattern state can be created via the `followedBy` or `followedByAny` method.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -282,6 +282,23 @@ Pattern<Event, ?> nonStrictNext = start.followedBy("middle");
 val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
 {% endhighlight %}
 </div>
+
+For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or
+all. In the latter case multiple matches will be emitted for the same beginning.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+Pattern<Event, ?> nonStrictNext = start.followedByAny("middle");
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val nonStrictNext : Pattern[Event, _] = start.followedByAny("middle")
+{% endhighlight %}
+</div>
+
 </div>
 It is also possible to define a temporal constraint for the pattern to be valid.
 For example, one can define that a pattern should occur within 10 seconds via the `within` method. 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
deleted file mode 100644
index 4bda08f..0000000
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/FollowedByPattern.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.cep.scala.pattern
-
-import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern}
-
-object FollowedByPattern {
-  /**
-    * Constructs a new Pattern by wrapping a given Java API Pattern
-    *
-    * @param jfbPattern Underlying Java API Pattern.
-    * @tparam T Base type of the elements appearing in the pattern
-    * @tparam F Subtype of T to which the current pattern operator is constrained
-    * @return New wrapping FollowedByPattern object
-    */
-  def apply[T, F <: T](jfbPattern: JFollowedByPattern[T, F]) =
-    new FollowedByPattern[T, F](jfbPattern)
-}
-
-/**
-  * Pattern operator which signifies that the there is a non-strict temporal contiguity between
-  * itself and its preceding pattern operator. This means that there might be events in between
-  * two matching events. These events are then simply ignored.
-  *
-  * @tparam T Base type of the events
-  * @tparam F Subtype of T to which the operator is currently constrained
-  */
-class FollowedByPattern[T, F <: T](jfbPattern: JFollowedByPattern[T, F])
-  extends Pattern[T, F](jfbPattern)

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 3bdbfcf..65b7ab0 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -46,22 +46,22 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   /**
     * @return The previous pattern
     */
-  def getPrevious(): Option[Pattern[T, _ <: T]] = {
-    wrapPattern(jPattern.getPrevious())
+  def getPrevious: Option[Pattern[T, _ <: T]] = {
+    wrapPattern(jPattern.getPrevious)
   }
 
   /**
     *
     * @return Name of the pattern operator
     */
-  def getName(): String = jPattern.getName()
+  def getName: String = jPattern.getName
 
   /**
     *
     * @return Window length in which the pattern match has to occur
     */
-  def getWindowTime(): Option[Time] = {
-    Option(jPattern.getWindowTime())
+  def getWindowTime: Option[Time] = {
+    Option(jPattern.getWindowTime)
   }
 
   /**
@@ -70,8 +70,8 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     */
   def getQuantifier: Quantifier = jPattern.getQuantifier
 
-  def getCondition(): Option[IterativeCondition[F]] = {
-    Option(jPattern.getCondition())
+  def getCondition: Option[IterativeCondition[F]] = {
+    Option(jPattern.getCondition)
   }
 
   /**
@@ -208,7 +208,22 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     * @param name Name of the new pattern
     * @return A new pattern which is appended to this one
     */
-  def followedBy(name: String) = FollowedByPattern(jPattern.followedBy(name))
+  def followedBy(name: String): Pattern[T, T] = {
+    Pattern[T, T](jPattern.followedBy(name))
+  }
+
+  /**
+    * Appends a new pattern to the existing one. The new pattern enforces non-strict
+    * temporal contiguity. This means that a matching event of this pattern and the
+    * preceding matching event might be interleaved with other events which are ignored.
+    *
+    * @param name Name of the new pattern
+    * @return A new pattern which is appended to this one
+    */
+  def followedByAny(name: String): Pattern[T, T] = {
+    Pattern[T, T](jPattern.followedByAny(name))
+  }
+
 
   /**
     * Specifies that this pattern is optional for a final match of the pattern

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
index 382c160..26355a5 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/package.scala
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.cep.scala
 
-import org.apache.flink.cep.pattern.{FollowedByPattern => JFollowedByPattern, Pattern => JPattern}
+import org.apache.flink.cep.pattern.{Pattern => JPattern}
 
 package object pattern {
   /**
@@ -31,7 +31,6 @@ package object pattern {
     */
   private[flink] def wrapPattern[T, F <: T](javaPattern: JPattern[T, F])
   : Option[Pattern[T, F]] = javaPattern match {
-    case f: JFollowedByPattern[T, F] => Some(FollowedByPattern[T, F](f))
     case p: JPattern[T, F] => Some(Pattern[T, F](p))
     case _ => None
   }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
index a95dddd..d574513 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/pattern/PatternTest.scala
@@ -22,7 +22,7 @@ import org.junit.Assert._
 import org.junit.Test
 import org.apache.flink.cep.Event
 import org.apache.flink.cep.SubEvent
-import org.apache.flink.cep.pattern.conditions.IterativeCondition.Context
+import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy
 import org.apache.flink.cep.pattern.conditions._
 
 class PatternTest {
@@ -33,7 +33,7 @@ class PatternTest {
     */
 
   @Test
-  def testStrictContiguity: Unit = {
+  def testStrictContiguity(): Unit = {
     val pattern = Pattern.begin[Event]("start").next("next").next("end")
     val jPattern = JPattern.begin[Event]("start").next("next").next("end")
 
@@ -56,7 +56,7 @@ class PatternTest {
 
 
   @Test
-  def testNonStrictContiguity: Unit = {
+  def testNonStrictContiguity(): Unit = {
     val pattern = Pattern.begin[Event]("start").followedBy("next").followedBy("end")
     val jPattern = JPattern.begin[Event]("start").followedBy("next").followedBy("end")
 
@@ -69,8 +69,8 @@ class PatternTest {
     assertTrue(previous.getPrevious.isDefined)
     assertFalse(preprevious.getPrevious.isDefined)
 
-    assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
-    assertTrue(previous.isInstanceOf[FollowedByPattern[_, _]])
+    assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier.getConsumingStrategy)
+    assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, previous.getQuantifier.getConsumingStrategy)
 
     assertEquals(pattern.getName, "end")
     assertEquals(previous.getName, "next")
@@ -78,25 +78,25 @@ class PatternTest {
   }
 
   @Test
-  def testStrictContiguityWithCondition: Unit = {
+  def testStrictContiguityWithCondition(): Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("next")
-      .where((value: Event, ctx: Context[Event]) => value.getName() == "foobar")
+      .where((value: Event, _) => value.getName == "foobar")
       .next("end")
-      .where((value: Event, ctx: Context[Event]) => value.getId() == 42)
+      .where((value: Event, _) => value.getId == 42)
 
     val jPattern = JPattern.begin[Event]("start")
       .next("next")
       .where(new SimpleCondition[Event]() {
         @throws[Exception]
         def filter(value: Event): Boolean = {
-          return value.getName() == "foobar"
+          value.getName == "foobar"
         }
       }).next("end")
       .where(new SimpleCondition[Event]() {
         @throws[Exception]
         def filter(value: Event): Boolean = {
-          return value.getId() == 42
+          value.getId == 42
         }
       })
 
@@ -120,7 +120,7 @@ class PatternTest {
   }
 
   @Test
-  def testPatternWithSubtyping: Unit = {
+  def testPatternWithSubtyping(): Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("subevent")
       .subtype(classOf[SubEvent])
@@ -150,11 +150,11 @@ class PatternTest {
   }
 
   @Test
-  def testPatternWithSubtypingAndFilter: Unit = {
+  def testPatternWithSubtypingAndFilter(): Unit = {
     val pattern = Pattern.begin[Event]("start")
       .next("subevent")
       .subtype(classOf[SubEvent])
-      .where((value: SubEvent) => false)
+      .where(_ => false)
       .followedBy("end")
 
     val jpattern = JPattern.begin[Event]("start")
@@ -163,7 +163,7 @@ class PatternTest {
       .where(new SimpleCondition[SubEvent]() {
         @throws[Exception]
         def filter(value: SubEvent): Boolean = {
-          return false
+          false
         }
       }).followedBy("end")
 
@@ -178,8 +178,8 @@ class PatternTest {
     assertTrue(previous.getPrevious.isDefined)
     assertFalse(preprevious.getPrevious.isDefined)
 
-    assertTrue(pattern.isInstanceOf[FollowedByPattern[_, _]])
-    assertTrue(previous.getCondition().isDefined)
+    assertEquals(ConsumingStrategy.SKIP_TILL_NEXT, pattern.getQuantifier.getConsumingStrategy)
+    assertTrue(previous.getCondition.isDefined)
 
     assertEquals(pattern.getName, "end")
     assertEquals(previous.getName, "subevent")
@@ -194,25 +194,25 @@ class PatternTest {
       && threeWayEquals(
       pattern.getName,
       pattern.wrappedPattern.getName,
-      jPattern.getName())
+      jPattern.getName)
       //check equal time windows
       && threeWayEquals(
       pattern.getWindowTime.orNull,
       pattern.wrappedPattern.getWindowTime,
-      jPattern.getWindowTime())
+      jPattern.getWindowTime)
       //check congruent class names / types
       && threeWayEquals(
       pattern.getClass.getSimpleName,
       pattern.wrappedPattern.getClass.getSimpleName,
-      jPattern.getClass().getSimpleName())
+      jPattern.getClass.getSimpleName)
       //best effort to confirm congruent filter functions
       && compareFilterFunctions(
-      pattern.getCondition().orNull,
-      jPattern.getCondition())
+      pattern.getCondition.orNull,
+      jPattern.getCondition)
       //recursively check previous patterns
       && checkCongruentRepresentations(
       pattern.getPrevious.orNull,
-      jPattern.getPrevious()))
+      jPattern.getPrevious))
   }
 
   def threeWayEquals(a: AnyRef, b: AnyRef, c: AnyRef): Boolean = {
@@ -233,15 +233,15 @@ class PatternTest {
     (sFilter, jFilter) match {
       //matching types: and-filter; branch and recurse for inner filters
       case (saf: AndCondition[_], jaf: AndCondition[_])
-      => (compareFilterFunctions(saf.getLeft(), jaf.getLeft())
-        && compareFilterFunctions(saf.getRight(), jaf.getRight()))
+      => (compareFilterFunctions(saf.getLeft, jaf.getLeft)
+        && compareFilterFunctions(saf.getRight, jaf.getRight))
       //matching types: subtype-filter
-      case (saf: SubtypeCondition[_], jaf: SubtypeCondition[_]) => true
+      case (_: SubtypeCondition[_], _: SubtypeCondition[_]) => true
       //mismatch: one-sided and/subtype-filter
       case (_: AndCondition[_] | _: SubtypeCondition[_], _) => false
       case (_, _: AndCondition[_] | _: SubtypeCondition[_]) => false
       //from here we can only check mutual presence or absence of a function
-      case (s: IterativeCondition[_], j: IterativeCondition[_]) => true
+      case (_: IterativeCondition[_], _: IterativeCondition[_]) => true
       case (null, null) => true
       case _ => false
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b4e0557..0ca0e14 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -20,32 +20,30 @@ package org.apache.flink.cep.nfa.compiler;
 
 import com.google.common.base.Predicate;
 import com.google.common.collect.Iterators;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.State;
 import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.StateTransitionAction;
-import org.apache.flink.cep.pattern.conditions.BooleanConditions;
-import org.apache.flink.cep.pattern.FollowedByPattern;
 import org.apache.flink.cep.pattern.MalformedPatternException;
-import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.cep.pattern.Quantifier.QuantifierProperty;
+import org.apache.flink.cep.pattern.Quantifier;
+import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.NotCondition;
 import org.apache.flink.streaming.api.windowing.time.Time;
 
-import javax.annotation.Nullable;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
 /**
  * Compiler class containing methods to compile a {@link Pattern} into a {@link NFA} or a
  * {@link NFAFactory}.
@@ -159,25 +157,7 @@ public class NFACompiler {
 
 			State<T> lastSink = sinkState;
 			while (currentPattern.getPrevious() != null) {
-				checkPatternNameUniqueness();
-				usedNames.add(currentPattern.getName());
-
-				if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
-					final State<T> looping = createLooping(lastSink);
-
-					if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
-						lastSink = createFirstMandatoryStateOfLoop(looping);
-					} else if (currentPattern instanceof FollowedByPattern &&
-								currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) {
-						lastSink = createWaitingStateForZeroOrMore(looping, lastSink);
-					} else {
-						lastSink = looping;
-					}
-				} else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
-					lastSink = createTimesState(lastSink, currentPattern.getTimes());
-				} else {
-					lastSink = createSingletonState(lastSink);
-				}
+				lastSink = convertPattern(lastSink);
 				currentPattern = currentPattern.getPrevious();
 
 				final Time currentWindowTime = currentPattern.getWindowTime();
@@ -190,6 +170,29 @@ public class NFACompiler {
 			return lastSink;
 		}
 
+		private State<T> convertPattern(final State<T> sinkState) {
+			final State<T> lastSink;
+			checkPatternNameUniqueness();
+			usedNames.add(currentPattern.getName());
+
+			final Quantifier quantifier = currentPattern.getQuantifier();
+			if (quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
+				final State<T> looping = createLooping(sinkState);
+
+				if (!quantifier.hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+					lastSink = createFirstMandatoryStateOfLoop(looping);
+				} else {
+					lastSink = createWaitingStateForZeroOrMore(looping, sinkState);
+				}
+			} else if (quantifier.hasProperty(Quantifier.QuantifierProperty.TIMES)) {
+				lastSink = createTimesState(sinkState, currentPattern.getTimes());
+			} else {
+				lastSink = createSingletonState(sinkState);
+			}
+
+			return lastSink;
+		}
+
 		/**
 		 * Creates a pair of states that enables relaxed strictness before a zeroOrMore looping state.
 		 *
@@ -197,19 +200,21 @@ public class NFACompiler {
 		 * @param lastSink     the state that the looping one points to
 		 * @return the newly created state
 		 */
+		@SuppressWarnings("unchecked")
 		private State<T> createWaitingStateForZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
-			final State<T> followByState = createNormalState();
-			final State<T> followByStateWithoutProceed = createNormalState();
-
 			final IterativeCondition<T> currentFunction = (IterativeCondition<T>)currentPattern.getCondition();
-			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
 
+			final State<T> followByState = createNormalState();
 			followByState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
-			followByState.addIgnore(followByStateWithoutProceed, ignoreFunction);
 			followByState.addTake(loopingState, currentFunction);
 
-			followByStateWithoutProceed.addIgnore(ignoreFunction);
-			followByStateWithoutProceed.addTake(loopingState, currentFunction);
+			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
+			if (ignoreFunction != null) {
+				final State<T> followByStateWithoutProceed = createNormalState();
+				followByState.addIgnore(followByStateWithoutProceed, ignoreFunction);
+				followByStateWithoutProceed.addIgnore(ignoreFunction);
+				followByStateWithoutProceed.addTake(loopingState, currentFunction);
+			}
 
 			return followByState;
 		}
@@ -230,23 +235,7 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createStartState(State<T> sinkState) {
-			checkPatternNameUniqueness();
-			usedNames.add(currentPattern.getName());
-
-			final State<T> beginningState;
-			if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.LOOPING)) {
-				final State<T> loopingState = createLooping(sinkState);
-				if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
-					beginningState = createFirstMandatoryStateOfLoop(loopingState);
-				} else {
-					beginningState = loopingState;
-				}
-			} else if (currentPattern.getQuantifier().hasProperty(QuantifierProperty.TIMES)) {
-				beginningState = createTimesState(sinkState, currentPattern.getTimes());
-			} else {
-				beginningState = createSingletonState(sinkState);
-			}
-
+			final State<T> beginningState = convertPattern(sinkState);
 			beginningState.makeStart();
 
 			return beginningState;
@@ -263,29 +252,26 @@ public class NFACompiler {
 		private State<T> createTimesState(final State<T> sinkState, int times) {
 			State<T> lastSink = sinkState;
 			for (int i = 0; i < times - 1; i++) {
-				lastSink = createSingletonState(
-					lastSink,
-					!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE),
-					false);
+				lastSink = createSingletonState(lastSink, getInnerIgnoreCondition(currentPattern), false);
 			}
 
+			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+
 			// we created the intermediate states in the loop, now we create the start of the loop.
-			if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL)) {
-				return createSingletonState(lastSink, currentPattern instanceof FollowedByPattern, false);
+			if (!currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL)) {
+				return createSingletonState(lastSink, ignoreCondition, false);
 			}
 
-			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
-			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
-
 			final State<T> singletonState = createNormalState();
 			singletonState.addTake(lastSink, currentFilterFunction);
-			singletonState.addProceed(sinkState, trueFunction);
+			singletonState.addProceed(sinkState, BooleanConditions.<T>trueFunction());
 
-			if (currentPattern instanceof FollowedByPattern) {
+			if (ignoreCondition != null) {
 				State<T> ignoreState = createNormalState();
 				ignoreState.addTake(lastSink, currentFilterFunction);
-				ignoreState.addIgnore(trueFunction);
-				singletonState.addIgnore(ignoreState, trueFunction);
+				ignoreState.addIgnore(ignoreCondition);
+				singletonState.addIgnore(ignoreState, ignoreCondition);
 			}
 			return singletonState;
 		}
@@ -300,8 +286,10 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createSingletonState(final State<T> sinkState) {
-			return createSingletonState(sinkState, currentPattern instanceof FollowedByPattern,
-					currentPattern.getQuantifier().hasProperty(QuantifierProperty.OPTIONAL));
+			return createSingletonState(
+				sinkState,
+				getIgnoreCondition(currentPattern),
+				currentPattern.getQuantifier().hasProperty(Quantifier.QuantifierProperty.OPTIONAL));
 		}
 
 		/**
@@ -309,12 +297,12 @@ public class NFACompiler {
 		 * of a similar state without the PROCEED edge, so that for each PROCEED transition branches
 		 * in computation state graph  can be created only once.
 		 *
-		 * @param addIgnore if any IGNORE should be added
+		 * @param ignoreCondition condition that should be applied to IGNORE transition
 		 * @param sinkState state that the state being converted should point to
 		 * @return the created state
 		 */
 		@SuppressWarnings("unchecked")
-		private State<T> createSingletonState(final State<T> sinkState, boolean addIgnore, boolean isOptional) {
+		private State<T> createSingletonState(final State<T> sinkState, final IterativeCondition<T> ignoreCondition, final boolean isOptional) {
 			final IterativeCondition<T> currentFilterFunction = (IterativeCondition<T>) currentPattern.getCondition();
 			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
@@ -325,7 +313,7 @@ public class NFACompiler {
 				singletonState.addProceed(sinkState, trueFunction);
 			}
 
-			if (addIgnore) {
+			if (ignoreCondition != null) {
 				final State<T> ignoreState;
 				if (isOptional) {
 					ignoreState = createNormalState();
@@ -333,7 +321,7 @@ public class NFACompiler {
 				} else {
 					ignoreState = singletonState;
 				}
-				singletonState.addIgnore(ignoreState, getIgnoreCondition(currentPattern));
+				singletonState.addIgnore(ignoreState, ignoreCondition);
 			}
 			return singletonState;
 		}
@@ -352,8 +340,8 @@ public class NFACompiler {
 			final State<T> firstState = createNormalState();
 
 			firstState.addTake(sinkState, currentFilterFunction);
-			if (currentPattern instanceof FollowedByPattern) {
-				final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
+			if (ignoreCondition != null) {
 				firstState.addIgnore(ignoreCondition);
 			}
 			return firstState;
@@ -369,18 +357,16 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createLooping(final State<T> sinkState) {
-
-			final State<T> loopingState = createNormalState();
 			final IterativeCondition<T> filterFunction = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> ignoreCondition = getInnerIgnoreCondition(currentPattern);
 			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
 
+			final State<T> loopingState = createNormalState();
 			loopingState.addProceed(sinkState, trueFunction);
 			loopingState.addTake(filterFunction);
-			if (!currentPattern.getQuantifier().hasProperty(QuantifierProperty.CONSECUTIVE)) {
-				final State<T> ignoreState = createNormalState();
-
-				final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
 
+			if (ignoreCondition != null) {
+				final State<T> ignoreState = createNormalState();
 				ignoreState.addTake(loopingState, filterFunction);
 				ignoreState.addIgnore(ignoreCondition);
 				loopingState.addIgnore(ignoreState, ignoreCondition);
@@ -403,15 +389,37 @@ public class NFACompiler {
 
 		/**
 		 * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
-		 * that corresponds to the specified {@link Pattern}. If the pattern is
-		 * {@link QuantifierProperty#EAGER}, the negated user-specified condition is
-		 * returned. In other case, a condition that always evaluated to {@code true} is
-		 * returned.
+		 * that corresponds to the specified {@link Pattern}. It is applicable only for inner states of a complex
+		 * state like looping or times.
 		 */
+		@SuppressWarnings("unchecked")
+		private IterativeCondition<T> getInnerIgnoreCondition(Pattern<T, ?> pattern) {
+			switch (pattern.getQuantifier().getInnerConsumingStrategy()) {
+				case STRICT:
+					return null;
+				case SKIP_TILL_NEXT:
+					return new NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+				case SKIP_TILL_ANY:
+					return BooleanConditions.trueFunction();
+			}
+			return null;
+		}
+
+		/**
+		 * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
+		 * that corresponds to the specified {@link Pattern}. For more on strategy see {@link Quantifier}
+		 */
+		@SuppressWarnings("unchecked")
 		private IterativeCondition<T> getIgnoreCondition(Pattern<T, ?> pattern) {
-			return pattern.getQuantifier().hasProperty(QuantifierProperty.EAGER)
-					? new NotCondition<>((IterativeCondition<T>) pattern.getCondition())
-					: BooleanConditions.<T>trueFunction();
+			switch (pattern.getQuantifier().getConsumingStrategy()) {
+				case STRICT:
+					return null;
+				case SKIP_TILL_NEXT:
+					return new NotCondition<>((IterativeCondition<T>) pattern.getCondition());
+				case SKIP_TILL_ANY:
+					return BooleanConditions.trueFunction();
+			}
+			return null;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
deleted file mode 100644
index 266451c..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/FollowedByPattern.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.pattern;
-
-/**
- * Pattern operator which signifies that the there is a non-strict temporal contiguity between
- * itself and its preceding pattern operator. This means that there might be events in between
- * two matching events. These events are then simply ignored.
- *
- * @param <T> Base type of the events
- * @param <F> Subtype of T to which the operator is currently constrained
- */
-public class FollowedByPattern<T, F extends T> extends Pattern<T, F> {
-	FollowedByPattern(final String name, Pattern<T, ?> previous) {
-		super(name, previous);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 3ce0c73..cef0f85 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -20,6 +20,7 @@ package org.apache.flink.cep.pattern;
 
 import org.apache.flink.api.java.ClosureCleaner;
 import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.pattern.Quantifier.ConsumingStrategy;
 import org.apache.flink.cep.pattern.conditions.AndCondition;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.OrCondition;
@@ -56,8 +57,8 @@ public class Pattern<T, F extends T> {
 	/** Window length in which the pattern match has to occur. */
 	private Time windowTime;
 
-	/** A quantifier for the pattern. By default set to {@link Quantifier#ONE()}. */
-	private Quantifier quantifier = Quantifier.ONE();
+	/** A quantifier for the pattern. By default set to {@link Quantifier#ONE(ConsumingStrategy)}. */
+	private Quantifier quantifier = Quantifier.ONE(ConsumingStrategy.STRICT);
 
 	/**
 	 * Applicable to a {@code times} pattern, and holds
@@ -70,6 +71,15 @@ public class Pattern<T, F extends T> {
 		this.previous = previous;
 	}
 
+	protected Pattern(
+			final String name,
+			final Pattern<T, ? extends T> previous,
+			final ConsumingStrategy consumingStrategy) {
+		this.name = name;
+		this.previous = previous;
+		this.quantifier = Quantifier.ONE(consumingStrategy);
+	}
+
 	public Pattern<T, ? extends T> getPrevious() {
 		return previous;
 	}
@@ -195,7 +205,19 @@ public class Pattern<T, F extends T> {
 	 * @return A new pattern which is appended to this one
 	 */
 	public Pattern<T, T> next(final String name) {
-		return new Pattern<T, T>(name, this);
+		return new Pattern<T, T>(name, this, ConsumingStrategy.STRICT);
+	}
+
+	/**
+	 * Appends a new pattern to the existing one. The new pattern enforces non-strict
+	 * temporal contiguity. This means that a matching event of this pattern and the
+	 * preceding matching event might be interleaved with other events which are ignored.
+	 *
+	 * @param name Name of the new pattern
+	 * @return A new pattern which is appended to this one
+	 */
+	public Pattern<T, T> followedBy(final String name) {
+		return new Pattern<>(name, this, ConsumingStrategy.SKIP_TILL_NEXT);
 	}
 
 	/**
@@ -206,8 +228,8 @@ public class Pattern<T, F extends T> {
 	 * @param name Name of the new pattern
 	 * @return A new pattern which is appended to this one
 	 */
-	public FollowedByPattern<T, T> followedBy(final String name) {
-		return new FollowedByPattern<T, T>(name, this);
+	public Pattern<T, T> followedByAny(final String name) {
+		return new Pattern<>(name, this, ConsumingStrategy.SKIP_TILL_ANY);
 	}
 
 	/**
@@ -232,12 +254,12 @@ public class Pattern<T, F extends T> {
 	 * {@code A1 A2 B} appears, this will generate patterns:
 	 * {@code A1 B} and {@code A1 A2 B}. See also {@link #allowCombinations()}.
 	 *
-	 * @return The same pattern with a {@link Quantifier#ONE_OR_MORE()} quantifier applied.
+	 * @return The same pattern with a {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} quantifier applied.
 	 * @throws MalformedPatternException if the quantifier is not applicable to this pattern.
 	 */
 	public Pattern<T, F> oneOrMore() {
 		checkIfQuantifierApplied();
-		this.quantifier = Quantifier.ONE_OR_MORE();
+		this.quantifier = Quantifier.ONE_OR_MORE(quantifier.getConsumingStrategy());
 		return this;
 	}
 
@@ -252,14 +274,14 @@ public class Pattern<T, F extends T> {
 	public Pattern<T, F> times(int times) {
 		checkIfQuantifierApplied();
 		Preconditions.checkArgument(times > 0, "You should give a positive number greater than 0.");
-		this.quantifier = Quantifier.TIMES();
+		this.quantifier = Quantifier.TIMES(quantifier.getConsumingStrategy());
 		this.times = times;
 		return this;
 	}
 
 	/**
-	 * Applicable only to {@link Quantifier#ONE_OR_MORE()} and {@link Quantifier#TIMES()} patterns,
-	 * this option allows more flexibility to the matching events.
+	 * Applicable only to {@link Quantifier#ONE_OR_MORE(ConsumingStrategy)} and
+	 * {@link Quantifier#TIMES(ConsumingStrategy)} patterns, this option allows more flexibility to the matching events.
 	 *
 	 * <p>If {@code allowCombinations()} is not applied for a
 	 * pattern {@code A.oneOrMore().followedBy(B)} and a sequence of events

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
index 0332ed4..b0f882c 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Quantifier.java
@@ -24,48 +24,62 @@ public class Quantifier {
 
 	private final EnumSet<QuantifierProperty> properties;
 
-	private Quantifier(final QuantifierProperty first, final QuantifierProperty... rest) {
+	private final ConsumingStrategy consumingStrategy;
+
+	private ConsumingStrategy innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_NEXT;
+
+	private Quantifier(
+			final ConsumingStrategy consumingStrategy,
+			final QuantifierProperty first,
+			final QuantifierProperty... rest) {
 		this.properties = EnumSet.of(first, rest);
+		this.consumingStrategy = consumingStrategy;
 	}
 
-	public static Quantifier ONE() {
-		return new Quantifier(QuantifierProperty.SINGLE);
+	public static Quantifier ONE(final ConsumingStrategy consumingStrategy) {
+		return new Quantifier(consumingStrategy, QuantifierProperty.SINGLE);
 	}
 
-	public static Quantifier ONE_OR_MORE() {
-		return new Quantifier(QuantifierProperty.LOOPING, QuantifierProperty.EAGER);
+	public static Quantifier ONE_OR_MORE(final ConsumingStrategy consumingStrategy) {
+		return new Quantifier(consumingStrategy, QuantifierProperty.LOOPING);
 	}
 
-	public static Quantifier TIMES() {
-		return new Quantifier(QuantifierProperty.TIMES, QuantifierProperty.EAGER);
+	public static Quantifier TIMES(final ConsumingStrategy consumingStrategy) {
+		return new Quantifier(consumingStrategy, QuantifierProperty.TIMES);
 	}
 
 	public boolean hasProperty(QuantifierProperty property) {
 		return properties.contains(property);
 	}
 
-	public void combinations() {
-		if (!hasProperty(QuantifierProperty.SINGLE) && !hasProperty(Quantifier.QuantifierProperty.EAGER)) {
-			throw new MalformedPatternException("Combinations already allowed!");
-		}
+	public ConsumingStrategy getConsumingStrategy() {
+		return consumingStrategy;
+	}
 
-		if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) {
-			properties.remove(Quantifier.QuantifierProperty.EAGER);
-		} else {
-			throw new MalformedPatternException("Combinations not applicable to " + this + "!");
+	public ConsumingStrategy getInnerConsumingStrategy() {
+		return innerConsumingStrategy;
+	}
+
+	private static void checkPattern(boolean condition, Object errorMessage) {
+		if (!condition) {
+			throw new MalformedPatternException(String.valueOf(errorMessage));
 		}
 	}
 
+	public void combinations() {
+		checkPattern(!hasProperty(QuantifierProperty.SINGLE), "Combinations not applicable to " + this + "!");
+		checkPattern(innerConsumingStrategy != ConsumingStrategy.STRICT, "You can apply apply either combinations or consecutive, not both!");
+		checkPattern(innerConsumingStrategy != ConsumingStrategy.SKIP_TILL_ANY, "Combinations already applied!");
+
+		innerConsumingStrategy = ConsumingStrategy.SKIP_TILL_ANY;
+	}
+
 	public void consecutive() {
-		if (!hasProperty(QuantifierProperty.SINGLE) && hasProperty(Quantifier.QuantifierProperty.CONSECUTIVE)) {
-			throw new MalformedPatternException("Strict continuity already applied!");
-		}
+		checkPattern(hasProperty(QuantifierProperty.LOOPING) || hasProperty(QuantifierProperty.TIMES), "Combinations not applicable to " + this + "!");
+		checkPattern(innerConsumingStrategy != ConsumingStrategy.SKIP_TILL_ANY, "You can apply apply either combinations or consecutive, not both!");
+		checkPattern(innerConsumingStrategy != ConsumingStrategy.STRICT, "Combinations already applied!");
 
-		if (hasProperty(Quantifier.QuantifierProperty.LOOPING) || hasProperty(Quantifier.QuantifierProperty.TIMES)) {
-			properties.add(Quantifier.QuantifierProperty.CONSECUTIVE);
-		} else {
-			throw new MalformedPatternException("Strict continuity not applicable to " + this + "!");
-		}
+		innerConsumingStrategy = ConsumingStrategy.STRICT;
 	}
 
 	public void optional() {
@@ -76,13 +90,21 @@ public class Quantifier {
 	}
 
 	@Override
-	public boolean equals(Object obj) {
-		return obj instanceof Quantifier && properties.equals(((Quantifier)obj).properties);
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+		Quantifier that = (Quantifier) o;
+		return Objects.equals(properties, that.properties) &&
+				consumingStrategy == that.consumingStrategy;
 	}
 
 	@Override
 	public int hashCode() {
-		return Objects.hashCode(properties);
+		return Objects.hash(properties, consumingStrategy);
 	}
 
 	/**
@@ -92,9 +114,13 @@ public class Quantifier {
 		SINGLE,
 		LOOPING,
 		TIMES,
-		EAGER,
-		CONSECUTIVE,
 		OPTIONAL
 	}
 
+	public enum ConsumingStrategy {
+		STRICT,
+		SKIP_TILL_NEXT,
+		SKIP_TILL_ANY
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7c35dc0e/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 3a32175..f62c686 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -96,7 +96,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 				return value.getName().equals("start");
 			}
 		})
-		.followedBy("middle").subtype(SubEvent.class).where(
+		.followedByAny("middle").subtype(SubEvent.class).where(
 				new SimpleCondition<SubEvent>() {
 
 					@Override
@@ -105,7 +105,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 					}
 				}
 			)
-		.followedBy("end").where(new SimpleCondition<Event>() {
+		.followedByAny("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -171,7 +171,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 				return value.getName().equals("start");
 			}
 		})
-			.followedBy("middle").subtype(SubEvent.class).where(
+			.followedByAny("middle").subtype(SubEvent.class).where(
 				new SimpleCondition<SubEvent>() {
 
 					@Override
@@ -180,7 +180,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 					}
 				}
 			)
-			.followedBy("end").where(new SimpleCondition<Event>() {
+			.followedByAny("end").where(new SimpleCondition<Event>() {
 
 				@Override
 				public boolean filter(Event value) throws Exception {
@@ -250,13 +250,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -339,13 +339,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -416,7 +416,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
 		DataStream<Integer> input = env.fromElements(1, 2);
 
-		Pattern<Integer, ?> pattern = Pattern.<Integer>begin("start").followedBy("end").within(Time.days(1));
+		Pattern<Integer, ?> pattern = Pattern.<Integer>begin("start").followedByAny("end").within(Time.days(1));
 
 		DataStream<Integer> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Integer, Integer>() {
 			@Override
@@ -470,13 +470,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
@@ -538,7 +538,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 					return value.getName().equals("start");
 				}
 			})
-			.followedBy("middle")
+			.followedByAny("middle")
 			.where(new SimpleCondition<Event>() {
 				@Override
 				public boolean filter(Event value) throws Exception {
@@ -551,7 +551,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 					return value.getPrice() == 5.0;
 				}
 			})
-			.followedBy("end").where(new SimpleCondition<Event>() {
+			.followedByAny("end").where(new SimpleCondition<Event>() {
 
 				@Override
 				public boolean filter(Event value) throws Exception {
@@ -623,13 +623,13 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("start");
 			}
-		}).followedBy("middle").where(new SimpleCondition<Event>() {
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {
 				return value.getName().equals("middle");
 			}
-		}).followedBy("end").where(new SimpleCondition<Event>() {
+		}).followedByAny("end").where(new SimpleCondition<Event>() {
 
 			@Override
 			public boolean filter(Event value) throws Exception {