You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2017/06/27 07:46:24 UTC

flink git commit: [FLINK-6418][cep] Support for dynamic state changes in CEP patterns

Repository: flink
Updated Branches:
  refs/heads/master cd8932f5d -> 34d14652e


[FLINK-6418][cep] Support for dynamic state changes in CEP patterns

This closes #4143.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/34d14652
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/34d14652
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/34d14652

Branch: refs/heads/master
Commit: 34d14652e5cc20e1801061b184f2eb4b48c692c6
Parents: cd8932f
Author: Dawid Wysakowicz <dw...@apache.org>
Authored: Mon Jun 19 16:40:02 2017 +0200
Committer: Dawid Wysakowicz <dw...@apache.org>
Committed: Tue Jun 27 09:21:14 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            | 297 ++++++----
 .../flink/cep/scala/pattern/Pattern.scala       |  49 ++
 .../flink/cep/nfa/compiler/NFACompiler.java     |  55 +-
 .../org/apache/flink/cep/pattern/Pattern.java   |  30 +
 .../org/apache/flink/cep/nfa/NFAITCase.java     |  36 ++
 .../flink/cep/nfa/UntilConditionITCase.java     | 570 +++++++++++++++++++
 .../apache/flink/cep/pattern/PatternTest.java   |  18 +
 7 files changed, 918 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/34d14652/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ab0e23d..63f61f7 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -25,13 +25,13 @@ under the License.
 
 FlinkCEP is the Complex Event Processing (CEP) library implemented on top of Flink.
 It allows you to easily detect event patterns in an endless stream of events, thus
-giving you the opportunity to quickly get hold of what's really important in your 
+giving you the opportunity to quickly get hold of what's really important in your
 data.
 
-This page describes the API calls available in Flink CEP. We start by presenting the [Pattern API](#the-pattern-api), 
-which allows you to specify the patterns that you want to detect in your stream, before presenting how you can 
-[detect and act upon matching event sequences](#detecting-patterns). At the end, we present the assumptions the CEP 
-library makes when [dealing with lateness](#handling-lateness-in-event-time) in event time and how you can 
+This page describes the API calls available in Flink CEP. We start by presenting the [Pattern API](#the-pattern-api),
+which allows you to specify the patterns that you want to detect in your stream, before presenting how you can
+[detect and act upon matching event sequences](#detecting-patterns). At the end, we present the assumptions the CEP
+library makes when [dealing with lateness](#handling-lateness-in-event-time) in event time and how you can
 [migrate your job](#migrating-from-an-older-flink-version) from an older Flink version to Flink-1.3.
 
 * This will be replaced by the TOC
@@ -39,7 +39,7 @@ library makes when [dealing with lateness](#handling-lateness-in-event-time) in
 
 ## Getting Started
 
-If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/dev/linking_with_flink.html) and 
+If you want to jump right in, you have to [set up a Flink program]({{ site.baseurl }}/dev/linking_with_flink.html) and
 add the FlinkCEP dependency to the `pom.xml` of your project.
 
 <div class="codetabs" markdown="1">
@@ -130,10 +130,10 @@ val result: DataStream[Alert] = patternStream.select(createAlert(_))
 
 ## The Pattern API
 
-The pattern API allows you to quickly define complex pattern sequences that you want to extract 
+The pattern API allows you to quickly define complex pattern sequences that you want to extract
 from your input stream.
 
-Each such complex pattern sequence consists of multiple simple patterns, i.e. patterns looking for 
+Each such complex pattern sequence consists of multiple simple patterns, i.e. patterns looking for
 individual events with the same properties. From now on, these simple patterns will be called **patterns**, and
 the final complex pattern sequence we are searching for in the stream, the **pattern sequence**. A pattern sequence
 can be seen as a graph of such patterns, where transitions from one pattern to the next occur based on user-specified
@@ -150,10 +150,10 @@ combine individual patterns into [Complex Patterns](#combining-patterns).
 
 ### Individual Patterns
 
-A **Pattern** can be either a *singleton* pattern, or a *looping* one. Singleton patterns accept a single 
-event, while looping ones can accept more than one. In pattern matching symbols, in the pattern `"a b+ c? d"` (or `"a"`, 
-followed by *one or more* `"b"`'s, optionally followed by a `"c"`, followed by a `"d"`), `a`, `c?`, and `d` are 
-singleton patterns, while `b+` is a looping one. By default, a pattern is a singleton pattern and you can transform 
+A **Pattern** can be either a *singleton* pattern, or a *looping* one. Singleton patterns accept a single
+event, while looping ones can accept more than one. In pattern matching symbols, in the pattern `"a b+ c? d"` (or `"a"`,
+followed by *one or more* `"b"`'s, optionally followed by a `"c"`, followed by a `"d"`), `a`, `c?`, and `d` are
+singleton patterns, while `b+` is a looping one. By default, a pattern is a singleton pattern and you can transform
 it to a looping one by using [Quantifiers](#quantifiers). In addition, each pattern can have one or more
 [Conditions](#conditions) based on which it accepts events.
 
@@ -161,37 +161,37 @@ it to a looping one by using [Quantifiers](#quantifiers). In addition, each patt
 
 In FlinkCEP, looping patterns can be specified using these methods: `pattern.oneOrMore()`, for patterns that expect one or
 more occurrences of a given event (e.g. the `b+` mentioned previously); and `pattern.times(#ofTimes)`, for patterns that
-expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s. All patterns, looping or not, can be made 
+expect a specific number of occurrences of a given type of event, e.g. 4 `a`'s. All patterns, looping or not, can be made
 optional using the `pattern.optional()` method. For a pattern named `start`, the following are valid quantifiers:
- 
+
  <div class="codetabs" markdown="1">
  <div data-lang="java" markdown="1">
  {% highlight java %}
  // expecting 4 occurrences
  start.times(4);
-  
+
  // expecting 0 or 4 occurrences
  start.times(4).optional();
- 
+
  // expecting 1 or more occurrences
  start.oneOrMore();
-   
+
  // expecting 0 or more occurrences
  start.oneOrMore().optional();
  {% endhighlight %}
  </div>
- 
+
  <div data-lang="scala" markdown="1">
  {% highlight scala %}
  // expecting 4 occurrences
  start.times(4)
-   
+
  // expecting 0 or 4 occurrences
  start.times(4).optional()
-  
+
  // expecting 1 or more occurrences
  start.oneOrMore()
-    
+
  // expecting 0 or more occurrences
  start.oneOrMore().optional()
  {% endhighlight %}
@@ -200,29 +200,29 @@ optional using the `pattern.optional()` method. For a pattern named `start`, the
 
 #### Conditions
 
-At every pattern, and in order to go from one pattern to the next, you can specify additional **conditions**. 
+At every pattern, and in order to go from one pattern to the next, you can specify additional **conditions**.
 These conditions can be related to:
- 
- 1. a [property of the incoming event](#conditions-on-properties), e.g. its value should be larger than 5, 
+
+ 1. a [property of the incoming event](#conditions-on-properties), e.g. its value should be larger than 5,
  or larger than the average value of the previously accepted events.
 
- 2. the [contiguity of the matching events](#conditions-on-contiguity), e.g. detect pattern `a,b,c` without 
+ 2. the [contiguity of the matching events](#conditions-on-contiguity), e.g. detect pattern `a,b,c` without
  non-matching events between any matching ones.
- 
-The latter refers to "looping" patterns, *i.e.* patterns that can accept more than one event, e.g. the `b+` in `a b+ c`, 
+
+The latter refers to "looping" patterns, *i.e.* patterns that can accept more than one event, e.g. the `b+` in `a b+ c`,
 which searches for one or more `b`'s.
 
 ##### Conditions on Properties
 
-Conditions on the event properties can be specified via the `pattern.where()` or the `pattern.or()` method. These can 
+Conditions on the event properties can be specified via the `pattern.where()`, `pattern.or()` or the `pattern.until()` method. These can
 be either `IterativeCondition`s or `SimpleCondition`s.
 
-**Iterative Conditions:** This is the most general type of conditions. This is how you can specify a condition that 
-accepts subsequent events based on properties of the previously accepted events or some statistic over a subset of them. 
+**Iterative Conditions:** This is the most general type of conditions. This is how you can specify a condition that
+accepts subsequent events based on properties of the previously accepted events or some statistic over a subset of them.
 
-Below is the code for an iterative condition that accepts the next event for a pattern named "middle" if its name starts 
-with "foo", and if the sum of the prices of the previously accepted events for that pattern plus the price of the current 
-event do not exceed the value of 5.0. Iterative conditions can be very powerful, especially in combination with looping 
+Below is the code for an iterative condition that accepts the next event for a pattern named "middle" if its name starts
+with "foo", and if the sum of the prices of the previously accepted events for that pattern plus the price of the current
+event do not exceed the value of 5.0. Iterative conditions can be very powerful, especially in combination with looping
 patterns, e.g. `oneOrMore()`.
 
 <div class="codetabs" markdown="1">
@@ -234,7 +234,7 @@ middle.oneOrMore().where(new IterativeCondition<SubEvent>() {
         if (!value.getName().startsWith("foo")) {
             return false;
         }
-        
+
         double sum = value.getPrice();
         for (Event event : ctx.getEventsForPattern("middle")) {
             sum += event.getPrice();
@@ -258,10 +258,10 @@ middle.oneOrMore().where(
 </div>
 
 {% warn Attention %} The call to `context.getEventsForPattern(...)` finds all the
-previously accepted events for a given potential match. The cost of this operation can vary, so when implementing 
+previously accepted events for a given potential match. The cost of this operation can vary, so when implementing
 your condition, try to minimize its use.
 
-**Simple Conditions:** This type of condition extends the aforementioned `IterativeCondition` class and decides 
+**Simple Conditions:** This type of condition extends the aforementioned `IterativeCondition` class and decides
 whether to accept an event or not, based *only* on properties of the event itself.
 
 <div class="codetabs" markdown="1">
@@ -283,7 +283,7 @@ start.where(event => event.getName.startsWith("foo"))
 </div>
 </div>
 
-Finally, we can also restrict the type of the accepted event to some subtype of the initial event type (here `Event`) 
+Finally, we can also restrict the type of the accepted event to some subtype of the initial event type (here `Event`)
 via the `pattern.subtype(subClass)` method.
 
 <div class="codetabs" markdown="1">
@@ -305,9 +305,9 @@ start.subtype(classOf[SubEvent]).where(subEvent => ... /* some condition */)
 </div>
 </div>
 
-**Combining Conditions:** As shown, the `subtype` condition can be combined with additional conditions. 
-In fact, this holds for every condition. You can arbitrarily combine conditions by sequentially calling 
-`where()`. The final result will be the logical **AND** of the results of the individual conditions. In 
+**Combining Conditions:** As shown, the `subtype` condition can be combined with additional conditions.
+In fact, this holds for every condition. You can arbitrarily combine conditions by sequentially calling
+`where()`. The final result will be the logical **AND** of the results of the individual conditions. In
 order to combine conditions using **OR**, you can use the `or()` method, as shown below.
 
 <div class="codetabs" markdown="1">
@@ -334,6 +334,20 @@ pattern.where(event => ... /* some condition */).or(event => ... /* or condition
 </div>
 </div>
 
+
+**Stop condition:** In case of looping patterns (`oneOrMore()` and `oneOrMore().optional()`) you can
+also specify a stop condition, e.g. accept events with value larger than 5 until the sum of values is smaller than 50.
+
+To better understand it, have a look at the following example. Given
+
+* pattern like `"(a+ until b)"` (one or more `"a"` until `"b"`)
+
+* a sequence of incoming events `"a1" "c" "a2" "b" "a3"`
+
+* the library will output results: `{a1 a2} {a1} {a2} {a3}`.
+
+As you can see `{a1 a2 a3}` or `{a2 a3}` are not returned due to the stop condition.
+
 ##### Conditions on Contiguity
 
 FlinkCEP supports the following forms of contiguity between events:
@@ -342,21 +356,21 @@ FlinkCEP supports the following forms of contiguity between events:
  without any non-matching events in-between.
 
  2. Relaxed Contiguity: which simply ignores non-matching events appearing in-between the matching ones.
- 
- 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity, allowing additional matches 
+
+ 3. Non-Deterministic Relaxed Contiguity: which further relaxes contiguity, allowing additional matches
  that ignore some matching events.
 
-To illustrate the above with an example, a pattern sequence `"a+ b"` (one or more `"a"`'s followed by a `"b"`) with 
+To illustrate the above with an example, a pattern sequence `"a+ b"` (one or more `"a"`'s followed by a `"b"`) with
 input `"a1", "c", "a2", "b"` will have the following results:
 
  1. Strict Contiguity: `{a2 b}` -- the `"c"` after `"a1"` causes `"a1"` to be discarded.
 
  2. Relaxed Contiguity: `{a1 b}` and `{a1 a2 b}` -- `c` is simply ignored.
- 
+
  3. Non-Deterministic Relaxed Contiguity: `{a1 b}`, `{a2 b}`, and `{a1 a2 b}`.
- 
-For looping patterns (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want 
-strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want 
+
+For looping patterns (e.g. `oneOrMore()` and `times()`) the default is *relaxed contiguity*. If you want
+strict contiguity, you have to explicitly specify it by using the `consecutive()` call, and if you want
 *non-deterministic relaxed contiguity* you can use the `allowCombinations()` call.
 
 {% warn Attention %}
@@ -393,7 +407,7 @@ pattern.where(new IterativeCondition<Event>() {
         <tr>
             <td><strong>or(condition)</strong></td>
             <td>
-                <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it 
+                <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it
                 passes at least one of the conditions:</p>
 {% highlight java %}
 pattern.where(new IterativeCondition<Event>() {
@@ -409,11 +423,28 @@ pattern.where(new IterativeCondition<Event>() {
 });
 {% endhighlight %}
                     </td>
-                </tr>
+       </tr>
+              <tr>
+                 <td><strong>until(condition)</strong></td>
+                 <td>
+                     <p>Specifies a stop condition for looping pattern. Meaning if event matching the given condition occurs, no more
+                     events will be accepted into the pattern.</p>
+                     <p>Applicable only in conjunction with <code>oneOrMore()</code></p>
+                     <p><b>NOTE:</b> It allows for cleaning state for corresponding pattern on event-based condition.</p>
+{% highlight java %}
+pattern.oneOrMore().until(new IterativeCondition<Event>() {
+    @Override
+    public boolean filter(Event value, Context ctx) throws Exception {
+        return ... // alternative condition
+    }
+});
+{% endhighlight %}
+                 </td>
+              </tr>
        <tr>
            <td><strong>subtype(subClass)</strong></td>
            <td>
-               <p>Defines a subtype condition for the current pattern. An event can only match the pattern if it is 
+               <p>Defines a subtype condition for the current pattern. An event can only match the pattern if it is
                 of this subtype:</p>
 {% highlight java %}
 pattern.subtype(SubEvent.class);
@@ -426,6 +457,7 @@ pattern.subtype(SubEvent.class);
               <p>Specifies that this pattern expects at least one occurrence of a matching event.</p>
               <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
               internal contiguity see <a href="#consecutive_java">consecutive</a>.</p>
+              <p><b>NOTE:</b> It is advised to use either <code>until()</code> or <code>within()</code> to enable state clearing</p>
 {% highlight java %}
 pattern.oneOrMore();
 {% endhighlight %}
@@ -445,7 +477,7 @@ pattern.times(2);
        <tr>
           <td><strong>optional()</strong></td>
           <td>
-              <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all 
+              <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all
               aforementioned quantifiers.</p>
 {% highlight java %}
 pattern.oneOrMore().optional();
@@ -458,7 +490,7 @@ pattern.oneOrMore().optional();
               <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes strict contiguity between the matching
               events, i.e. any non-matching element breaks the match (as in <code>next()</code>).</p>
               <p>If not applied a relaxed contiguity (as in <code>followedBy()</code>) is used.</p>
-            
+
               <p>E.g. a pattern like:</p>
 {% highlight java %}
 Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@@ -481,7 +513,7 @@ Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 });
 {% endhighlight %}
               <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
-            
+
               <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p>
               <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
           </td>
@@ -492,7 +524,7 @@ Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
               <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes non-deterministic relaxed contiguity
               between the matching events (as in <code>followedByAny()</code>).</p>
               <p>If not applied a relaxed contiguity (as in <code>followedBy()</code>) is used.</p>
-                   
+
               <p>E.g. a pattern like:</p>
 {% highlight java %}
 Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@@ -515,7 +547,7 @@ Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 });
 {% endhighlight %}
                <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
-               
+
                <p>with combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}</p>
                <p>without combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
        </td>
@@ -533,7 +565,7 @@ Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
         </tr>
 	    </thead>
     <tbody>
-      
+
         <tr>
             <td><strong>where(condition)</strong></td>
             <td>
@@ -547,7 +579,7 @@ pattern.where(event => ... /* some condition */)
         <tr>
             <td><strong>or(condition)</strong></td>
             <td>
-                <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it 
+                <p>Adds a new condition which is ORed with an existing one. An event can match the pattern only if it
                 passes at least one of the conditions:</p>
 {% highlight scala %}
 pattern.where(event => ... /* some condition */)
@@ -555,10 +587,22 @@ pattern.where(event => ... /* some condition */)
 {% endhighlight %}
                     </td>
                 </tr>
+<tr>
+          <td><strong>until(condition)</strong></td>
+          <td>
+              <p>Specifies a stop condition for looping pattern. Meaning if event matching the given condition occurs, no more
+              events will be accepted into the pattern.</p>
+              <p>Applicable only in conjunction with <code>oneOrMore()</code></p>
+              <p><b>NOTE:</b> It allows for cleaning state for corresponding pattern on event-based condition.</p>
+{% highlight scala %}
+pattern.oneOrMore().until(event => ... /* some condition */)
+{% endhighlight %}
+          </td>
+       </tr>
        <tr>
            <td><strong>subtype(subClass)</strong></td>
            <td>
-               <p>Defines a subtype condition for the current pattern. An event can only match the pattern if it is 
+               <p>Defines a subtype condition for the current pattern. An event can only match the pattern if it is
                of this subtype:</p>
 {% highlight scala %}
 pattern.subtype(classOf[SubEvent])
@@ -571,6 +615,7 @@ pattern.subtype(classOf[SubEvent])
                <p>Specifies that this pattern expects at least one occurrence of a matching event.</p>
                             <p>By default a relaxed internal contiguity (between subsequent events) is used. For more info on
                             internal contiguity see <a href="#consecutive_scala">consecutive</a>.</p>
+                            <p><b>NOTE:</b> It is advised to use either <code>until()</code> or <code>within()</code> to enable state clearing</p>
 {% highlight scala %}
 pattern.oneOrMore()
 {% endhighlight %}
@@ -580,7 +625,7 @@ pattern.oneOrMore()
                  <td><strong>times(#ofTimes)</strong></td>
                  <td>
                      <p>Specifies that this pattern expects an exact number of occurrences of a matching event.</p>
-                                   <p>By default a relaxed internal contiguity (between subsequent events) is used. 
+                                   <p>By default a relaxed internal contiguity (between subsequent events) is used.
                                    For more info on internal contiguity see <a href="#consecutive_scala">consecutive</a>.</p>
 {% highlight scala %}
 pattern.times(2)
@@ -590,7 +635,7 @@ pattern.times(2)
        <tr>
           <td><strong>optional()</strong></td>
           <td>
-             <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all 
+             <p>Specifies that this pattern is optional, i.e. it may not occur at all. This is applicable to all
                            aforementioned quantifiers.</p>
 {% highlight scala %}
 pattern.oneOrMore().optional()
@@ -603,8 +648,8 @@ pattern.oneOrMore().optional()
             <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes strict contiguity between the matching
                           events, i.e. any non-matching element breaks the match (as in <code>next()</code>).</p>
                           <p>If not applied a relaxed contiguity (as in <code>followedBy()</code>) is used.</p>
-            
-      <p>E.g. a pattern like:</p> 
+
+      <p>E.g. a pattern like:</p>
 {% highlight scala %}
 Pattern.begin("start").where(_.getName().equals("c"))
   .followedBy("middle").where(_.getName().equals("a"))
@@ -613,7 +658,7 @@ Pattern.begin("start").where(_.getName().equals("c"))
 {% endhighlight %}
 
             <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
-                        
+
                           <p>with consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}</p>
                           <p>without consecutive applied: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
           </td>
@@ -624,7 +669,7 @@ Pattern.begin("start").where(_.getName().equals("c"))
                 <p>Works in conjunction with <code>oneOrMore()</code> and <code>times()</code> and imposes non-deterministic relaxed contiguity
                      between the matching events (as in <code>followedByAny()</code>).</p>
                      <p>If not applied a relaxed contiguity (as in <code>followedBy()</code>) is used.</p>
-                          
+
       <p>E.g. a pattern like:</p>
 {% highlight scala %}
 Pattern.begin("start").where(_.getName().equals("c"))
@@ -632,9 +677,9 @@ Pattern.begin("start").where(_.getName().equals("c"))
                        .oneOrMore().allowCombinations()
   .followedBy("end1").where(_.getName().equals("b"));
 {% endhighlight %}
-                     
+
                       <p>Will generate the following matches for an input sequence: C D A1 A2 A3 D A4 B</p>
-                          
+
                       <p>with combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A3 B}, {C A1 A4 B}, {C A1 A2 A3 B}, {C A1 A2 A4 B}, {C A1 A3 A4 B}, {C A1 A2 A3 A4 B}</p>
                       <p>without combinations enabled: {C A1 B}, {C A1 A2 B}, {C A1 A2 A3 B}, {C A1 A2 A3 A4 B}</p>
               </td>
@@ -647,7 +692,7 @@ Pattern.begin("start").where(_.getName().equals("c"))
 
 ### Combining Patterns
 
-Now that we have seen what an individual pattern can look like, it is time to see how to combine them 
+Now that we have seen what an individual pattern can look like, it is time to see how to combine them
 into a full pattern sequence.
 
 A pattern sequence has to start with an initial pattern, as shown below:
@@ -666,16 +711,16 @@ val start : Pattern[Event, _] = Pattern.begin("start")
 </div>
 </div>
 
-Next, you can append more patterns to your pattern sequence by specifying the desired *contiguity conditions* between 
+Next, you can append more patterns to your pattern sequence by specifying the desired *contiguity conditions* between
 them. In the [previous section](#conditions-on-contiguity) we described the different contiguity modes supported by
-Flink, namely *strict*, *relaxed*, and *non-deterministic relaxed*, and how to apply them in looping patterns. To apply 
-them between consecutive patterns, you can use: 
+Flink, namely *strict*, *relaxed*, and *non-deterministic relaxed*, and how to apply them in looping patterns. To apply
+them between consecutive patterns, you can use:
 
-1. `next()`, for *strict*, 
-2. `followedBy()`, for *relaxed*, and 
+1. `next()`, for *strict*,
+2. `followedBy()`, for *relaxed*, and
 3. `followedByAny()`, for *non-deterministic relaxed* contiguity.
 
-or 
+or
 
 1. `notNext()`, if you do not want an event type to directly follow another
 2. `notFollowedBy()`, if you do not want an event type to be anywhere between two other event types
@@ -734,13 +779,13 @@ a pattern `a b`, given the event sequence `"a", "c", "b1", "b2"`, will give the
 
 1. Strict Contiguity between `a` and `b`: `{}` (no match) -- the `"c"` after `"a"` causes `"a"` to be discarded.
 
-2. Relaxed Contiguity between `a` and `b`: `{a b1}` -- as relaxed continuity is viewed as "skip non-matching events 
+2. Relaxed Contiguity between `a` and `b`: `{a b1}` -- as relaxed continuity is viewed as "skip non-matching events
 till the next matching one".
- 
+
 3. Non-Deterministic Relaxed Contiguity between `a` and `b`: `{a b1}`, `{a b2}` -- as this is the most general form.
 
 Finally, it is also possible to define a temporal constraint for the pattern to be valid.
-For example, you can define that a pattern should occur within 10 seconds via the `pattern.within()` method. 
+For example, you can define that a pattern should occur within 10 seconds via the `pattern.within()` method.
 Temporal patterns are supported for both [processing and event time]({{site.baseurl}}/dev/event_time.html).
 
 {% warn Attention %} A pattern sequence can only have one temporal constraint. If
@@ -784,7 +829,7 @@ Pattern<Event, ?> start = Pattern.<Event>begin("start");
         <tr>
             <td><strong>next()</strong></td>
             <td>
-                <p>Appends a new pattern. A matching event has to directly succeed the previous matching event 
+                <p>Appends a new pattern. A matching event has to directly succeed the previous matching event
                 (strict contiguity):</p>
 {% highlight java %}
 Pattern<Event, ?> next = start.next("middle");
@@ -794,7 +839,7 @@ Pattern<Event, ?> next = start.next("middle");
         <tr>
             <td><strong>followedBy()</strong></td>
             <td>
-                <p>Appends a new pattern. Other events can occur between a matching event and the previous 
+                <p>Appends a new pattern. Other events can occur between a matching event and the previous
                 matching event (relaxed contiguity):</p>
 {% highlight java %}
 Pattern<Event, ?> followedBy = start.followedBy("middle");
@@ -804,8 +849,8 @@ Pattern<Event, ?> followedBy = start.followedBy("middle");
         <tr>
             <td><strong>followedByAny()</strong></td>
             <td>
-                <p>Appends a new pattern. Other events can occur between a matching event and the previous 
-                matching event, and alternative matches will be presented for every alternative matching event 
+                <p>Appends a new pattern. Other events can occur between a matching event and the previous
+                matching event, and alternative matches will be presented for every alternative matching event
                 (non-deterministic relaxed contiguity):</p>
 {% highlight java %}
 Pattern<Event, ?> followedByAny = start.followedByAny("middle");
@@ -815,7 +860,7 @@ Pattern<Event, ?> followedByAny = start.followedByAny("middle");
         <tr>
                     <td><strong>notNext()</strong></td>
                     <td>
-                        <p>Appends a new negative pattern. A matching (negative) event has to directly succeed the 
+                        <p>Appends a new negative pattern. A matching (negative) event has to directly succeed the
                         previous matching event (strict contiguity) for the partial match to be discarded:</p>
 {% highlight java %}
 Pattern<Event, ?> notNext = start.notNext("not");
@@ -826,7 +871,7 @@ Pattern<Event, ?> notNext = start.notNext("not");
                     <td><strong>notFollowedBy()</strong></td>
                     <td>
                         <p>Appends a new negative pattern. A partial matching event sequence will be discarded even
-                        if other events occur between the matching (negative) event and the previous matching event 
+                        if other events occur between the matching (negative) event and the previous matching event
                         (relaxed contiguity):</p>
 {% highlight java %}
 Pattern<Event, ?> notFollowedBy = start.notFllowedBy("not");
@@ -836,7 +881,7 @@ Pattern<Event, ?> notFollowedBy = start.notFllowedBy("not");
        <tr>
           <td><strong>within(time)</strong></td>
           <td>
-              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event 
+              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event
               sequence exceeds this time, it is discarded:</p>
 {% highlight java %}
 pattern.within(Time.seconds(10));
@@ -868,7 +913,7 @@ val start = Pattern.begin[Event]("start")
         <tr>
             <td><strong>next()</strong></td>
             <td>
-                <p>Appends a new pattern. A matching event has to directly succeed the previous matching event 
+                <p>Appends a new pattern. A matching event has to directly succeed the previous matching event
                 (strict contiguity):</p>
 {% highlight scala %}
 val next = start.next("middle")
@@ -878,7 +923,7 @@ val next = start.next("middle")
         <tr>
             <td><strong>followedBy()</strong></td>
             <td>
-                <p>Appends a new pattern. Other events can occur between a matching event and the previous 
+                <p>Appends a new pattern. Other events can occur between a matching event and the previous
                 matching event (relaxed contiguity) :</p>
 {% highlight scala %}
 val followedBy = start.followedBy("middle")
@@ -888,19 +933,19 @@ val followedBy = start.followedBy("middle")
         <tr>
                     <td><strong>followedByAny()</strong></td>
                     <td>
-                        <p>Appends a new pattern. Other events can occur between a matching event and the previous 
-                        matching event, and alternative matches will be presented for every alternative matching event 
+                        <p>Appends a new pattern. Other events can occur between a matching event and the previous
+                        matching event, and alternative matches will be presented for every alternative matching event
                         (non-deterministic relaxed contiguity):</p>
 {% highlight scala %}
 val followedByAny = start.followedByAny("middle");
 {% endhighlight %}
                             </td>
                 </tr>
-                
+
                 <tr>
                                     <td><strong>notNext()</strong></td>
                                     <td>
-                                        <p>Appends a new negative pattern. A matching (negative) event has to directly succeed the 
+                                        <p>Appends a new negative pattern. A matching (negative) event has to directly succeed the
                                         previous matching event (strict contiguity) for the partial match to be discarded:</p>
 {% highlight scala %}
 val notNext = start.notNext("not")
@@ -911,18 +956,18 @@ val notNext = start.notNext("not")
                                     <td><strong>notFollowedBy()</strong></td>
                                     <td>
                                         <p>Appends a new negative pattern. A partial matching event sequence will be discarded even
-                                        if other events occur between the matching (negative) event and the previous matching event 
+                                        if other events occur between the matching (negative) event and the previous matching event
                                         (relaxed contiguity):</p>
 {% highlight scala %}
 val notFollowedBy = start.notFllowedBy("not")
 {% endhighlight %}
                                     </td>
                                 </tr>
-        
+
        <tr>
           <td><strong>within(time)</strong></td>
           <td>
-              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event 
+              <p>Defines the maximum time interval for an event sequence to match the pattern. If a non-completed event
               sequence exceeds this time, it is discarded:</p>
 {% highlight scala %}
 pattern.within(Time.seconds(10))
@@ -937,7 +982,7 @@ pattern.within(Time.seconds(10))
 
 ## Detecting Patterns
 
-After specifying the pattern sequence you are looking for, it is time to apply it to your input stream to detect 
+After specifying the pattern sequence you are looking for, it is time to apply it to your input stream to detect
 potential matches. In order to run a stream of events against your pattern sequence, you have to create a `PatternStream`.
 Given an input stream `input` and a pattern `pattern`, you create the `PatternStream` by calling:
 
@@ -974,10 +1019,10 @@ Once you have obtained a `PatternStream` you can select from detected event sequ
 <div data-lang="java" markdown="1">
 The `select()` method requires a `PatternSelectFunction` implementation.
 A `PatternSelectFunction` has a `select` method which is called for each matching event sequence.
-It receives a match in the form of `Map<String, List<IN>>` where the key is the name of each pattern in your pattern 
-sequence and the value is a list of all accepted events for that pattern (`IN` is the type of your input elements). 
-The events for a given pattern are ordered by timestamp. The reason for returning a list of accepted events for each 
-pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a 
+It receives a match in the form of `Map<String, List<IN>>` where the key is the name of each pattern in your pattern
+sequence and the value is a list of all accepted events for that pattern (`IN` is the type of your input elements).
+The events for a given pattern are ordered by timestamp. The reason for returning a list of accepted events for each
+pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a
 given pattern. The selection function returns exactly one result.
 
 {% highlight java %}
@@ -991,8 +1036,8 @@ class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT>
 }
 {% endhighlight %}
 
-A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an 
-arbitrary number of results. In order to do this, the `select` method has an additional `Collector` parameter which is 
+A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an
+arbitrary number of results. In order to do this, the `select` method has an additional `Collector` parameter which is
 used to forward your output elements downstream.
 
 {% highlight java %}
@@ -1012,10 +1057,10 @@ class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<
 
 <div data-lang="scala" markdown="1">
 The `select()` method takes a selection function as argument, which is called for each matching event sequence.
-It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each pattern in your pattern 
-sequence and the value is an Iterable over all accepted events for that pattern (`IN` is the type of your input elements). 
-The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each 
-pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a 
+It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each pattern in your pattern
+sequence and the value is an Iterable over all accepted events for that pattern (`IN` is the type of your input elements).
+The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each
+pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a
 given pattern. The selection function returns exactly one result per call.
 
 {% highlight scala %}
@@ -1026,8 +1071,8 @@ def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
 }
 {% endhighlight %}
 
-The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the 
-`flatSelect` method can return an arbitrary number of results per call. In order to do this, the function for 
+The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the
+`flatSelect` method can return an arbitrary number of results per call. In order to do this, the function for
 `flatSelect` has an additional `Collector` parameter which is used to forward your output elements downstream.
 
 {% highlight scala %}
@@ -1044,18 +1089,18 @@ def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]
 
 ### Handling Timed Out Partial Patterns
 
-Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences 
-are discarded because they exceed the window length. In order to react to these timed out partial matches the `select` 
-and `flatSelect` API calls allow a timeout handler to be specified. This timeout handler is called for each timed out 
-partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and 
+Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
+are discarded because they exceed the window length. In order to react to these timed out partial matches the `select`
+and `flatSelect` API calls allow a timeout handler to be specified. This timeout handler is called for each timed out
+partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and
 the timestamp when the timeout was detected.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
-In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as 
-the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter the known 
-`PatternSelectFunction`/`PatternFlatSelectFunction`. The return type of the timeout function can be different from the 
-select function. The timeout event and the select event are wrapped in `Either.Left` and `Either.Right` respectively 
+In order to treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as
+the first parameter a `PatternTimeoutFunction`/`PatternFlatTimeoutFunction` and as second parameter the known
+`PatternSelectFunction`/`PatternFlatSelectFunction`. The return type of the timeout function can be different from the
+select function. The timeout event and the select event are wrapped in `Either.Left` and `Either.Right` respectively
 so that the resulting data stream is of type `org.apache.flink.types.Either`.
 
 {% highlight java %}
@@ -1114,19 +1159,19 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect
 ## Handling Lateness in Event Time
 
 In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order
-when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending 
-order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller 
-than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order. 
+when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending
+order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller
+than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order.
 
 {% warn Attention %} The library assumes correctness of the watermark when working in event time.
 
-To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes 
-*correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last 
+To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes
+*correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last
 seen watermark. Late elements are not further processed.
 
 ## Examples
 
-The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data 
+The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data
 stream of `Events`. The events are keyed by their ids and a valid pattern has to occur within 10 seconds.
 The whole processing is done with event time.
 
@@ -1192,20 +1237,20 @@ val alerts = patternStream.select(createAlert(_)))
 
 ## Migrating from an older Flink version
 
-The CEP library in Flink-1.3 ships with a number of new features which have led to some changes in the API. Here we 
-describe the changes that you need to make to your old CEP jobs, in order to be able to run them with Flink-1.3. After 
-making these changes and recompiling your job, you will be able to resume its execution from a savepoint taken with the 
+The CEP library in Flink-1.3 ships with a number of new features which have led to some changes in the API. Here we
+describe the changes that you need to make to your old CEP jobs, in order to be able to run them with Flink-1.3. After
+making these changes and recompiling your job, you will be able to resume its execution from a savepoint taken with the
 old version of your job, *i.e.* without having to re-process your past data.
 
 The changes required are:
 
-1. Change your conditions (the ones in the `where(...)` clause) to extend the `SimpleCondition` class instead of 
+1. Change your conditions (the ones in the `where(...)` clause) to extend the `SimpleCondition` class instead of
 implementing the `FilterFunction` interface.
 
 2. Change your functions provided as arguments to the `select(...)` and `flatSelect(...)` methods to expect a list of
 events associated with each pattern (`List` in `Java`, `Iterable` in `Scala`). This is because with the addition of
 the looping patterns, multiple input events can match a single (looping) pattern.
 
-3. The `followedBy()` in Flink 1.1 and 1.2 implied `non-deterministic relaxed contiguity` (see 
-[here](#conditions-on-contiguity)). In Flink 1.3 this has changed and `followedBy()` implies `relaxed contiguity`, 
-while `followedByAny()` should be used if `non-deterministic relaxed contiguity` is required.
\ No newline at end of file
+3. The `followedBy()` in Flink 1.1 and 1.2 implied `non-deterministic relaxed contiguity` (see
+[here](#conditions-on-contiguity)). In Flink 1.3 this has changed and `followedBy()` implies `relaxed contiguity`,
+while `followedByAny()` should be used if `non-deterministic relaxed contiguity` is required.

http://git-wip-us.apache.org/repos/asf/flink/blob/34d14652/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index 270b2f5..644da5e 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -75,6 +75,10 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
     Option(jPattern.getCondition)
   }
 
+  def getUntilCondition: Option[IterativeCondition[F]] = {
+    Option(jPattern.getUntilCondition)
+  }
+
   /**
     * Adds a condition that has to be satisfied by an event
     * in order to be considered a match. If another condition has already been
@@ -199,6 +203,51 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
+    * Applies a stop condition for a looping state. It allows cleaning the underlying state.
+    *
+    * @param untilCondition a condition an event has to satisfy to stop collecting events into
+    *                       looping state
+    * @return The same pattern with applied untilCondition
+    */
+  def until(untilCondition: IterativeCondition[F]): Pattern[T, F] = {
+    jPattern.until(untilCondition)
+    this
+  }
+
+  /**
+    * Applies a stop condition for a looping state. It allows cleaning the underlying state.
+    *
+    * @param untilCondition a condition an event has to satisfy to stop collecting events into
+    *                       looping state
+    * @return The same pattern with applied untilCondition
+    */
+  def until(untilCondition: (F, Context[F]) => Boolean): Pattern[T, F] = {
+    val condFun = new IterativeCondition[F] {
+      val cleanCond = cep.scala.cleanClosure(untilCondition)
+
+      override def filter(value: F, ctx: JContext[F]): Boolean =
+        cleanCond(value, new JContextWrapper(ctx))
+    }
+    until(condFun)
+  }
+
+  /**
+    * Applies a stop condition for a looping state. It allows cleaning the underlying state.
+    *
+    * @param untilCondition a condition an event has to satisfy to stop collecting events into
+    *                       looping state
+    * @return The same pattern with applied untilCondition
+    */
+  def until(untilCondition: F => Boolean): Pattern[T, F] = {
+    val condFun = new IterativeCondition[F] {
+      val cleanCond = cep.scala.cleanClosure(untilCondition)
+
+      override def filter(value: F, ctx: JContext[F]): Boolean = cleanCond(value)
+    }
+    until(condFun)
+  }
+
+  /**
     * Defines the maximum time interval in which a matching pattern has to be completed in
     * order to be considered valid. This interval corresponds to the maximum time gap between first
     * and the last event.

http://git-wip-us.apache.org/repos/asf/flink/blob/34d14652/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
index b5a437b..c28390e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
@@ -29,6 +29,7 @@ import org.apache.flink.cep.pattern.MalformedPatternException;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.cep.pattern.Quantifier;
 import org.apache.flink.cep.pattern.Quantifier.Times;
+import org.apache.flink.cep.pattern.conditions.AndCondition;
 import org.apache.flink.cep.pattern.conditions.BooleanConditions;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.cep.pattern.conditions.NotCondition;
@@ -463,19 +464,25 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createLooping(final State<T> sinkState) {
-			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
-			final IterativeCondition<T> ignoreCondition = getInnerIgnoreCondition(currentPattern);
-			final IterativeCondition<T> trueFunction = BooleanConditions.trueFunction();
+			final IterativeCondition<T> untilCondition = (IterativeCondition<T>) currentPattern.getUntilCondition();
+
+			final IterativeCondition<T> ignoreCondition = extendWithUntilCondition(
+				getInnerIgnoreCondition(currentPattern),
+				untilCondition);
+			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
+				(IterativeCondition<T>) currentPattern.getCondition(),
+				untilCondition);
 
+			final IterativeCondition<T> proceedCondition = BooleanConditions.trueFunction();
 			final State<T> loopingState = createState(currentPattern.getName(), State.StateType.Normal);
-			loopingState.addProceed(sinkState, trueFunction);
-			loopingState.addTake(currentCondition);
+			loopingState.addProceed(sinkState, proceedCondition);
+			loopingState.addTake(takeCondition);
 
 			addStopStateToLooping(loopingState);
 
 			if (ignoreCondition != null) {
 				final State<T> ignoreState = createState(currentPattern.getName(), State.StateType.Normal);
-				ignoreState.addTake(loopingState, currentCondition);
+				ignoreState.addTake(loopingState, takeCondition);
 				ignoreState.addIgnore(ignoreCondition);
 				loopingState.addIgnore(ignoreState, ignoreCondition);
 
@@ -493,10 +500,13 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createInitMandatoryStateOfOneOrMore(final State<T> sinkState) {
-			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
+				(IterativeCondition<T>) currentPattern.getCondition(),
+				(IterativeCondition<T>) currentPattern.getUntilCondition()
+			);
 
 			final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
-			firstState.addTake(sinkState, currentCondition);
+			firstState.addTake(sinkState, takeCondition);
 
 			final IterativeCondition<T> ignoreCondition = getIgnoreCondition(currentPattern);
 			if (ignoreCondition != null) {
@@ -514,18 +524,21 @@ public class NFACompiler {
 		 */
 		@SuppressWarnings("unchecked")
 		private State<T> createInitOptionalStateOfZeroOrMore(final State<T> loopingState, final State<T> lastSink) {
-			final IterativeCondition<T> currentCondition = (IterativeCondition<T>) currentPattern.getCondition();
+			final IterativeCondition<T> takeCondition = extendWithUntilCondition(
+				(IterativeCondition<T>) currentPattern.getCondition(),
+				(IterativeCondition<T>) currentPattern.getUntilCondition()
+			);
 
 			final State<T> firstState = createState(currentPattern.getName(), State.StateType.Normal);
 			firstState.addProceed(lastSink, BooleanConditions.<T>trueFunction());
-			firstState.addTake(loopingState, currentCondition);
+			firstState.addTake(loopingState, takeCondition);
 
 			final IterativeCondition<T> ignoreFunction = getIgnoreCondition(currentPattern);
 			if (ignoreFunction != null) {
 				final State<T> firstStateWithoutProceed = createState(currentPattern.getName(), State.StateType.Normal);
 				firstState.addIgnore(firstStateWithoutProceed, ignoreFunction);
 				firstStateWithoutProceed.addIgnore(ignoreFunction);
-				firstStateWithoutProceed.addTake(loopingState, currentCondition);
+				firstStateWithoutProceed.addTake(loopingState, takeCondition);
 
 				addStopStates(firstStateWithoutProceed);
 			}
@@ -533,6 +546,26 @@ public class NFACompiler {
 		}
 
 		/**
+		 * This method extends the given condition with stop(until) condition if necessary.
+		 * The until condition needs to be applied only if both of the given conditions are not null.
+		 *
+		 * @param condition the condition to extend
+		 * @param untilCondition the until condition to join with the given condition
+		 * @return condition with AND applied or the original condition
+		 */
+		private IterativeCondition<T> extendWithUntilCondition(
+				IterativeCondition<T> condition,
+				IterativeCondition<T> untilCondition) {
+			if (untilCondition != null && condition != null) {
+				return new AndCondition<>(new NotCondition<>(untilCondition), condition);
+			} else if (untilCondition != null) {
+				return new NotCondition<>(untilCondition);
+			}
+
+			return condition;
+		}
+
+		/**
 		 * @return The {@link IterativeCondition condition} for the {@code IGNORE} edge
 		 * that corresponds to the specified {@link Pattern}. It is applicable only for inner states of a complex
 		 * state like looping or times.

http://git-wip-us.apache.org/repos/asf/flink/blob/34d14652/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 8767a94..1131318 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -61,6 +61,9 @@ public class Pattern<T, F extends T> {
 	/** A quantifier for the pattern. By default set to {@link Quantifier#one(ConsumingStrategy)}. */
 	private Quantifier quantifier = Quantifier.one(ConsumingStrategy.STRICT);
 
+	/** The condition an event has to satisfy to stop collecting events into looping state. */
+	private IterativeCondition<F> untilCondition;
+
 	/**
 	 * Applicable to a {@code times} pattern, and holds
 	 * the number of times it has to appear.
@@ -105,6 +108,10 @@ public class Pattern<T, F extends T> {
 		return condition;
 	}
 
+	public IterativeCondition<F> getUntilCondition() {
+		return untilCondition;
+	}
+
 	/**
 	 * Starts a new pattern sequence. The provided name is the one of the initial pattern
 	 * of the new sequence. Furthermore, the base type of the event sequence is set.
@@ -186,6 +193,29 @@ public class Pattern<T, F extends T> {
 	}
 
 	/**
+	 * Applies a stop condition for a looping state. It allows cleaning the underlying state.
+	 *
+	 * @param untilCondition a condition an event has to satisfy to stop collecting events into looping state
+	 * @return The same pattern with applied untilCondition
+	 */
+	public Pattern<T, F> until(IterativeCondition<F> untilCondition) {
+		Preconditions.checkNotNull(untilCondition, "The condition cannot be null");
+
+		if (this.untilCondition != null) {
+			throw new MalformedPatternException("Only one until condition can be applied.");
+		}
+
+		if (!quantifier.hasProperty(Quantifier.QuantifierProperty.LOOPING)) {
+			throw new MalformedPatternException("The until condition is only applicable to looping states.");
+		}
+
+		ClosureCleaner.clean(untilCondition, true);
+		this.untilCondition = untilCondition;
+
+		return this;
+	}
+
+	/**
 	 * Defines the maximum time interval in which a matching pattern has to be completed in
 	 * order to be considered valid. This interval corresponds to the maximum time gap between first
 	 * and the last event.

http://git-wip-us.apache.org/repos/asf/flink/blob/34d14652/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 506587b..a83eb12 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -84,6 +84,42 @@ public class NFAITCase extends TestLogger {
 	}
 
 	@Test
+	public void testNoConditionLoopingNFA() {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event a = new Event(40, "a", 1.0);
+		Event b = new Event(41, "b", 2.0);
+		Event c = new Event(42, "c", 3.0);
+		Event d = new Event(43, "d", 4.0);
+		Event e = new Event(44, "e", 5.0);
+
+		inputEvents.add(new StreamRecord<>(a, 1));
+		inputEvents.add(new StreamRecord<>(b, 2));
+		inputEvents.add(new StreamRecord<>(c, 3));
+		inputEvents.add(new StreamRecord<>(d, 4));
+		inputEvents.add(new StreamRecord<>(e, 5));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").followedBy("end").oneOrMore();
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(a, b, c, d, e),
+			Lists.newArrayList(a, b, c, d),
+			Lists.newArrayList(a, b, c),
+			Lists.newArrayList(a, b),
+			Lists.newArrayList(b, c, d, e),
+			Lists.newArrayList(b, c, d),
+			Lists.newArrayList(b, c),
+			Lists.newArrayList(c, d, e),
+			Lists.newArrayList(c, d),
+			Lists.newArrayList(d, e)
+		));
+	}
+
+	@Test
 	public void testAnyWithNoConditionNFA() {
 		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/34d14652/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
new file mode 100644
index 0000000..d56e883
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/UntilConditionITCase.java
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.cep.pattern.conditions.SimpleCondition;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import com.google.common.collect.Lists;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.cep.nfa.NFATestUtilities.compareMaps;
+import static org.apache.flink.cep.nfa.NFATestUtilities.feedNFA;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for {@link Pattern#until(IterativeCondition)}.
+ */
+public class UntilConditionITCase {
+
+
+	/**
+	 * Condition used for {@link Pattern#until(IterativeCondition)} clause.
+	 */
+	public static final SimpleCondition<Event> UNTIL_CONDITION = new SimpleCondition<Event>() {
+		private static final long serialVersionUID = 5726188262756267490L;
+
+		@Override
+		public boolean filter(Event value) throws Exception {
+			return value.getPrice() == 5.0;
+		}
+	};
+
+	@Test
+	public void testUntilConditionFollowedByOneOrMore() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event ignored = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(ignored, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().until(UNTIL_CONDITION)
+			.followedBy("end").where(
+				UNTIL_CONDITION
+			);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, breaking)
+		));
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
+	public void testUntilConditionFollowedByOneOrMoreCombinations() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event ignored = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(ignored, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().allowCombinations().until(UNTIL_CONDITION)
+			.followedBy("end").where(UNTIL_CONDITION);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, breaking)
+		));
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
+	public void testUntilConditionFollowedByOneOrMoreConsecutive() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event breaking = new Event(45, "a", 5.0);
+		Event ignored = new Event(46, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(breaking, 7));
+		inputEvents.add(new StreamRecord<>(ignored, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().consecutive().until(UNTIL_CONDITION)
+			.followedBy("end").where(
+				UNTIL_CONDITION
+			);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, breaking)
+		));
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
+	public void testUntilConditionFollowedByZeroOrMore() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event ignored = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(ignored, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().until(UNTIL_CONDITION)
+			.followedBy("end").where(
+				UNTIL_CONDITION
+			);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, breaking),
+			Lists.newArrayList(startEvent, breaking)
+		));
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
+	public void testUntilConditionFollowedByZeroOrMoreCombinations() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(43, "a", 4.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event ignored = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(ignored, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().allowCombinations().until(UNTIL_CONDITION)
+			.followedBy("end").where(UNTIL_CONDITION);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent3, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, breaking),
+			Lists.newArrayList(startEvent, breaking)
+		));
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
+	public void testUntilConditionFollowedByZeroOrMoreConsecutive() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event breaking = new Event(45, "a", 5.0);
+		Event ignored = new Event(46, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(breaking, 7));
+		inputEvents.add(new StreamRecord<>(ignored, 8));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().consecutive().until(UNTIL_CONDITION)
+			.followedBy("end").where(
+				UNTIL_CONDITION
+			);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, breaking),
+			Lists.newArrayList(startEvent, middleEvent1, breaking),
+			Lists.newArrayList(startEvent, breaking)
+		));
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
+	public void testUntilConditionFollowedByAnyOneOrMore() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event middleEvent3 = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().until(UNTIL_CONDITION);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+			Lists.newArrayList(startEvent, middleEvent1),
+			Lists.newArrayList(startEvent, middleEvent2),
+			Lists.newArrayList(startEvent, middleEvent3)
+		));
+	}
+
+	@Test
+	public void testUntilConditionFollowedByAnyZeroOrMore() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event middleEvent3 = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedByAny("middle").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("a");
+			}
+		}).oneOrMore().optional().until(UNTIL_CONDITION);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+			Lists.newArrayList(startEvent, middleEvent1),
+			Lists.newArrayList(startEvent, middleEvent2),
+			Lists.newArrayList(startEvent, middleEvent3),
+			Lists.newArrayList(startEvent)
+		));
+	}
+
+	@Test
+	public void testUntilConditionWithEmptyWhere() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(40, "d", 1.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event ignored = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(ignored, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").oneOrMore().until(UNTIL_CONDITION);
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+			Lists.newArrayList(startEvent, middleEvent1)
+		));
+
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
+	public void testIterativeUntilConditionOneOrMore() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(40, "d", 1.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event ignored = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(ignored, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").oneOrMore().until(new IterativeCondition<Event>() {
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+
+				double sum = 0;
+				for (Event middle : ctx.getEventsForPattern("middle")) {
+					sum += middle.getPrice();
+				}
+
+				return sum == 6.0;
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+			Lists.newArrayList(startEvent, middleEvent1)
+		));
+
+		assertTrue(nfa.isEmpty());
+	}
+
+	@Test
+	public void testIterativeUntilConditionZeroOrMore() throws Exception {
+		List<StreamRecord<Event>> inputEvents = new ArrayList<>();
+
+		Event startEvent = new Event(40, "c", 1.0);
+		Event middleEvent1 = new Event(41, "a", 2.0);
+		Event middleEvent2 = new Event(42, "a", 3.0);
+		Event middleEvent3 = new Event(40, "d", 1.0);
+		Event breaking = new Event(44, "a", 5.0);
+		Event ignored = new Event(45, "a", 6.0);
+
+		inputEvents.add(new StreamRecord<>(startEvent, 1));
+		inputEvents.add(new StreamRecord<>(middleEvent1, 3));
+		inputEvents.add(new StreamRecord<>(middleEvent2, 4));
+		inputEvents.add(new StreamRecord<>(middleEvent3, 5));
+		inputEvents.add(new StreamRecord<>(breaking, 6));
+		inputEvents.add(new StreamRecord<>(ignored, 7));
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+			private static final long serialVersionUID = 5726188262756267490L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return value.getName().equals("c");
+			}
+		}).followedBy("middle").oneOrMore().optional().until(new IterativeCondition<Event>() {
+			@Override
+			public boolean filter(Event value, Context<Event> ctx) throws Exception {
+
+				double sum = 0;
+				for (Event middle : ctx.getEventsForPattern("middle")) {
+					sum += middle.getPrice();
+				}
+
+				return sum == 6.0;
+			}
+		});
+
+		NFA<Event> nfa = NFACompiler.compile(pattern, Event.createTypeSerializer(), false);
+
+		final List<List<Event>> resultingPatterns = feedNFA(inputEvents, nfa);
+
+		compareMaps(resultingPatterns, Lists.<List<Event>>newArrayList(
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2, middleEvent3),
+			Lists.newArrayList(startEvent, middleEvent1, middleEvent2),
+			Lists.newArrayList(startEvent, middleEvent1),
+			Lists.newArrayList(startEvent)
+		));
+
+		assertTrue(nfa.isEmpty());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/34d14652/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index e00384b..999e5f3 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -257,6 +257,24 @@ public class PatternTest extends TestLogger {
 		Pattern.begin("start").where(dummyCondition()).notFollowedBy("not").where(dummyCondition()).optional();
 	}
 
+	@Test(expected = MalformedPatternException.class)
+	public void testUntilCannotBeAppliedToTimes() throws Exception {
+
+		Pattern.begin("start").where(dummyCondition()).times(1).until(dummyCondition());
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testUntilCannotBeAppliedToSingleton() throws Exception {
+
+		Pattern.begin("start").where(dummyCondition()).until(dummyCondition());
+	}
+
+	@Test(expected = MalformedPatternException.class)
+	public void testUntilCannotBeAppliedTwice() throws Exception {
+
+		Pattern.begin("start").where(dummyCondition()).until(dummyCondition()).until(dummyCondition());
+	}
+
 	private SimpleCondition<Object> dummyCondition() {
 		return new SimpleCondition<Object>() {
 			private static final long serialVersionUID = -2205071036073867531L;