You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by dw...@apache.org on 2018/12/30 14:16:27 UTC

[flink] branch master updated (33f6d06 -> 54fea60)

This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 33f6d06  [FLINK-7208][table] Optimize Min/MaxWithRetractAggFunction with DataView
     new 1ccc740  [hotfix][cep] Fixed timestamp for timed out partial matches
     new 822b17f  [hotfix][cep] Fixed argument list code style
     new 6f5b9c9  [hotfix][cep] Added test event builder
     new 92eb82b  [hotfix][cep] Added test harness output asserter
     new 4496809  [FLINK-10596][cep] Introduced PatternProcessFunction
     new bec4a28  [hotfix][cep] Fixed passing runtime context
     new 2b8edec  [FLINK-10596][cep][docs] Updated cep docs with PatternProcessFunction
     new 54fea60  [hotfix][cep] Made PatternStream immutable & changed PatternStreamBuilder to a proper builder

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 docs/dev/libs/cep.md                               | 158 +++-----
 .../org/apache/flink/cep/scala/PatternStream.scala |  25 +-
 ...ernStreamScalaJavaAPIInteroperabilityTest.scala |  95 +++--
 .../src/main/java/org/apache/flink/cep/CEP.java    |   8 +-
 .../flink/cep/PatternProcessFunctionBuilder.java   | 147 +++++++
 .../java/org/apache/flink/cep/PatternStream.java   | 288 +++++++-------
 .../org/apache/flink/cep/PatternStreamBuilder.java | 154 ++++++++
 .../flink/cep/RichPatternFlatSelectFunction.java   |  15 +-
 .../flink/cep/RichPatternSelectFunction.java       |  15 +-
 .../org/apache/flink/cep/context/TimerContext.java |  21 +-
 .../cep/functions/PatternProcessFunction.java      |  76 ++++
 .../cep/functions/TimedOutPartialMatchHandler.java |  81 ++++
 .../adaptors/PatternFlatSelectAdapter.java         |  63 +++
 .../functions/adaptors/PatternSelectAdapter.java   |  63 +++
 .../adaptors/PatternTimeoutFlatSelectAdapter.java  | 111 ++++++
 .../adaptors/PatternTimeoutSelectAdapter.java      |  79 ++++
 .../main/java/org/apache/flink/cep/nfa/NFA.java    |  39 +-
 .../flink/cep/operator/CEPOperatorUtils.java       | 319 ---------------
 ...yedCEPPatternOperator.java => CepOperator.java} | 161 +++++---
 .../cep/{ => operator}/CepRuntimeContext.java      |  54 +--
 .../flink/cep/operator/FlatSelectCepOperator.java  |  69 ----
 .../cep/operator/FlatSelectTimeoutCepOperator.java | 131 -------
 .../flink/cep/operator/SelectCepOperator.java      |  58 ---
 .../cep/operator/SelectTimeoutCepOperator.java     | 120 ------
 .../operator/TimestampedSideOutputCollector.java   |  82 ----
 .../pattern/conditions/RichIterativeCondition.java |   8 +-
 .../java/org/apache/flink/cep/nfa/NFAITCase.java   |   4 +-
 .../apache/flink/cep/operator/CEPOperatorTest.java |  92 +++--
 .../cep/operator/CepOperatorTestUtilities.java     |  29 +-
 .../operator/CepProcessFunctionContextTest.java    | 428 +++++++++++++++++++++
 .../cep/{ => operator}/CepRuntimeContextTest.java  | 174 +++++----
 .../apache/flink/cep/utils/CepOperatorBuilder.java | 192 +++++++++
 .../org/apache/flink/cep/utils/EventBuilder.java   |  72 ++++
 .../org/apache/flink/cep/utils/OutputAsserter.java |  78 ++++
 34 files changed, 2202 insertions(+), 1307 deletions(-)
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternProcessFunctionBuilder.java
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
 copy flink-core/src/main/java/org/apache/flink/core/memory/SeekableDataOutputView.java => flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java (56%)
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternFlatSelectAdapter.java
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternSelectAdapter.java
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
 create mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
 delete mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 rename flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/{AbstractKeyedCEPPatternOperator.java => CepOperator.java} (74%)
 rename flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/{ => operator}/CepRuntimeContext.java (75%)
 delete mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
 delete mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
 delete mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
 delete mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
 delete mode 100644 flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
 create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
 rename flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/{ => operator}/CepRuntimeContextTest.java (66%)
 create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/CepOperatorBuilder.java
 create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/EventBuilder.java
 create mode 100644 flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java


[flink] 02/08: [hotfix][cep] Fixed argument list code style

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 822b17fac22edc8b9b800de21b6e10b5b1332ebb
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:29:38 2018 +0100

    [hotfix][cep] Fixed argument list code style
---
 .../flink-cep/src/main/java/org/apache/flink/cep/CEP.java            | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index e684560..b7cb391 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -48,7 +48,10 @@ public class CEP {
 	 * @param <T> Type of the input events
 	 * @return Resulting pattern stream
 	 */
-	public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern, EventComparator<T> comparator) {
+	public static <T> PatternStream<T> pattern(
+			DataStream<T> input,
+			Pattern<T, ?> pattern,
+			EventComparator<T> comparator) {
 		return new PatternStream<>(input, pattern, comparator);
 	}
 }


[flink] 07/08: [FLINK-10596][cep][docs] Updated cep docs with PatternProcessFunction

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2b8edec2e833ed3216dddd2e386e77b2c2d50467
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:20:10 2018 +0100

    [FLINK-10596][cep][docs] Updated cep docs with PatternProcessFunction
---
 docs/dev/libs/cep.md | 158 ++++++++++++++++++++-------------------------------
 1 file changed, 62 insertions(+), 96 deletions(-)

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ce9a9c3..0519dbc 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -101,14 +101,16 @@ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
 
 PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
-DataStream<Alert> result = patternStream.select(
-    new PatternSelectFunction<Event, Alert>() {
+DataStream<Alert> result = patternStream.process(
+    new PatternProcessFunction<Event, Alert>() {
         @Override
-        public Alert select(Map<String, List<Event>> pattern) throws Exception {
-            return createAlertFrom(pattern);
+        public void processMatch(
+                Map<String, List<Event>> pattern,
+                Context ctx,
+                Collector<Alert> out) throws Exception {
+            out.collect(createAlertFrom(pattern));
         }
-    }
-});
+    });
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -121,7 +123,15 @@ val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
 
 val patternStream = CEP.pattern(input, pattern)
 
-val result: DataStream[Alert] = patternStream.select(createAlert(_))
+val result: DataStream[Alert] = patternStream.process(
+    new PatternProcessFunction[Event, Alert]() {
+        override def processMatch(
+              `match`: util.Map[String, util.List[Event]],
+              ctx: PatternProcessFunction.Context,
+              out: Collector[Alert]): Unit = {
+            out.collect(createAlertFrom(pattern))
+        }
+    })
 {% endhighlight %}
 </div>
 </div>
@@ -1477,92 +1487,61 @@ The input stream can be *keyed* or *non-keyed* depending on your use-case.
 
 ### Selecting from Patterns
 
-Once you have obtained a `PatternStream` you can select from detected event sequences via the `select` or `flatSelect` methods.
+Once you have obtained a `PatternStream` you can apply transformation to detected event sequences. The suggested way of doing that
+is by `PatternProcessFunction`.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-The `select()` method requires a `PatternSelectFunction` implementation.
-A `PatternSelectFunction` has a `select` method which is called for each matching event sequence.
+A `PatternProcessFunction` has a `processMatch` method which is called for each matching event sequence.
 It receives a match in the form of `Map<String, List<IN>>` where the key is the name of each pattern in your pattern
 sequence and the value is a list of all accepted events for that pattern (`IN` is the type of your input elements).
 The events for a given pattern are ordered by timestamp. The reason for returning a list of accepted events for each
-pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern. The selection function returns exactly one result.
+pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern.
 
 {% highlight java %}
-class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
+class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
     @Override
-    public OUT select(Map<String, List<IN>> pattern) {
-        IN startEvent = pattern.get("start").get(0);
-        IN endEvent = pattern.get("end").get(0);
-        return new OUT(startEvent, endEvent);
+    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
+        IN startEvent = match.get("start").get(0);
+        IN endEvent = match.get("end").get(0);
+        out.collect(OUT(startEvent, endEvent));
     }
 }
 {% endhighlight %}
 
-A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an
-arbitrary number of results. To do this, the `select` method has an additional `Collector` parameter which is
-used to forward your output elements downstream.
+The `PatternProcessFunction` gives access to a `Context` object. Thanks to it, one can access time related
+characteristics such as `currentProcessingTime` or `timestamp` of current match (which is the timestamp of the last element assigned to the match).
+Through this context one can also emit results to a [side-output]({{ site.baseurl }}/dev/stream/side_output.html).
 
-{% highlight java %}
-class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
-    @Override
-    public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
-        IN startEvent = pattern.get("start").get(0);
-        IN endEvent = pattern.get("end").get(0);
 
-        for (int i = 0; i < startEvent.getValue(); i++ ) {
-            collector.collect(new OUT(startEvent, endEvent));
-        }
-    }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-The `select()` method takes a selection function as argument, which is called for each matching event sequence.
-It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each pattern in your pattern
-sequence and the value is an Iterable over all accepted events for that pattern (`IN` is the type of your input elements).
+#### Handling Timed Out Partial Patterns
 
-The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern. The selection function returns exactly one result per call.
-
-{% highlight scala %}
-def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
-    val startEvent = pattern.get("start").get.head
-    val endEvent = pattern.get("end").get.head
-    OUT(startEvent, endEvent)
-}
-{% endhighlight %}
+Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
+are discarded because they exceed the window length. To act upon a timed out partial match one can use `TimedOutPartialMatchHandler` interface.
+The interface is supposed to be used in a mixin style. This mean you can additionally implement this interface with your `PatternProcessFunction`.
+The `TimedOutPartialMatchHandler` provides the additional `processTimedOutMatch` method which will be called for every timed out partial match.
 
-The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the
-`flatSelect` method can return an arbitrary number of results per call. In order to do this, the function for
-`flatSelect` has an additional `Collector` parameter which is used to forward your output elements downstream.
+{% highlight java %}
+class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
+    @Override
+    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
+        ...
+    }
 
-{% highlight scala %}
-def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
-    val startEvent = pattern.get("start").get.head
-    val endEvent = pattern.get("end").get.head
-    for (i <- 0 to startEvent.getValue) {
-        collector.collect(OUT(startEvent, endEvent))
+    @Override
+    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
+        IN startEvent = match.get("start").get(0);
+        ctx.output(outputTag, T(startEvent));
     }
 }
 {% endhighlight %}
-</div>
-</div>
 
-### Handling Timed Out Partial Patterns
+<span class="label label-info">Note</span> The `processTimedOutMatch` does not give one access to the main output. You can still emit results
+through [side-outputs]({{ site.baseurl }}/dev/stream/side_output.html) though, through the `Context` object.
 
-Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
-are discarded because they exceed the window length. To react to these timed out partial matches the `select`
-and `flatSelect` API calls allow you to specify a timeout handler. This timeout handler is called for each timed out
-partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and
-the timestamp when the timeout was detected.
 
-To treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as
-parameters
+#### Convenience API
 
- * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction`
- * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the side output in which the timed out matches will be returned
- * and the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+The aforementioned `PatternProcessFunction` was introduced in Flink 1.8 and since then it is the recommended way to interact with matches.
+One can still use the old style API like `select`/`flatSelect`, which internally will be translated into a `PatternProcessFunction`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1572,18 +1551,21 @@ PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
 OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
 
-SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
-    outputTag,
-    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
-    new PatternSelectFunction<Event, ComplexEvent>() {...}
-);
-
-DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
-
 SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
     outputTag,
-    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
-    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
+    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {
+        public void timeout(
+                Map<String, List<Event>> pattern,
+                long timeoutTimestamp,
+                Collector<TimeoutEvent> out) throws Exception {
+            out.collect(new TimeoutEvent());
+        }
+    },
+    new PatternFlatSelectFunction<Event, ComplexEvent>() {
+        public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
+            out.collect(new ComplexEvent());
+        }
+    }
 );
 
 DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
@@ -1594,23 +1576,7 @@ DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag)
 <div data-lang="scala" markdown="1">
 
 {% highlight scala %}
-val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
-
-val outputTag = OutputTag[String]("side-output")
-
-val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
-    (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
-} {
-    pattern: Map[String, Iterable[Event]] => ComplexEvent()
-}
 
-val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
-{% endhighlight %}
-
-The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
-In contrast to the `select` functions, the `flatSelect` functions are called with a `Collector`. You can use the collector to emit an arbitrary number of events.
-
-{% highlight scala %}
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
 val outputTag = OutputTag[String]("side-output")


[flink] 08/08: [hotfix][cep] Made PatternStream immutable & changed PatternStreamBuilder to a proper builder

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 54fea6072daf2f9868d34994c77ae16930e48f16
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Wed Dec 19 10:10:39 2018 +0100

    [hotfix][cep] Made PatternStream immutable & changed PatternStreamBuilder to a proper builder
---
 .../org/apache/flink/cep/scala/PatternStream.scala |  10 +-
 .../src/main/java/org/apache/flink/cep/CEP.java    |   3 +-
 .../java/org/apache/flink/cep/PatternStream.java   | 133 ++++++++-------------
 .../org/apache/flink/cep/PatternStreamBuilder.java |  79 ++++++++----
 4 files changed, 107 insertions(+), 118 deletions(-)

diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index dd17ea6..d7e7d3f 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -21,9 +21,7 @@ import java.util.{UUID, List => JList, Map => JMap}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.cep.functions.PatternProcessFunction
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
-import org.apache.flink.cep.scala.pattern.Pattern
-import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
 import org.apache.flink.util.Collector
 
@@ -42,12 +40,6 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
   private[flink] def wrappedPatternStream = jPatternStream
 
-  def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
-
-  def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream)
-
-  def getComparator: EventComparator[T] = jPatternStream.getComparator
-
   /**
     * Applies a process function to the detected pattern sequence. For each pattern sequence the
     * provided [[PatternProcessFunction]] is called.
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index b7cb391..3774879 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -52,6 +52,7 @@ public class CEP {
 			DataStream<T> input,
 			Pattern<T, ?> pattern,
 			EventComparator<T> comparator) {
-		return new PatternStream<>(input, pattern, comparator);
+		final PatternStream<T> stream = new PatternStream<>(input, pattern);
+		return stream.withComparator(comparator);
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 40508fe..ca918a9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -19,7 +19,6 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -36,6 +35,7 @@ import java.util.UUID;
 
 import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromFlatSelect;
 import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
@@ -50,42 +50,22 @@ import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
  */
 public class PatternStream<T> {
 
-	// underlying data stream
-	private final DataStream<T> inputStream;
+	private final PatternStreamBuilder<T> builder;
 
-	private final Pattern<T, ?> pattern;
-
-	// comparator to sort events
-	private final EventComparator<T> comparator;
-
-	/**
-	 * Side output {@code OutputTag} for late data. If no tag is set late data will be simply
-	 * dropped.
-	 */
-	private OutputTag<T> lateDataOutputTag;
-
-	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
-		this.inputStream = inputStream;
-		this.pattern = pattern;
-		this.comparator = null;
-	}
-
-	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern, final EventComparator<T> comparator) {
-		this.inputStream = inputStream;
-		this.pattern = pattern;
-		this.comparator = comparator;
+	private PatternStream(final PatternStreamBuilder<T> builder) {
+		this.builder = checkNotNull(builder);
 	}
 
-	public Pattern<T, ?> getPattern() {
-		return pattern;
+	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
+		this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));
 	}
 
-	public DataStream<T> getInputStream() {
-		return inputStream;
+	PatternStream<T> withComparator(final EventComparator<T> comparator) {
+		return new PatternStream<>(builder.withComparator(comparator));
 	}
 
-	public EventComparator<T> getComparator() {
-		return comparator;
+	public PatternStream<T> sideOutputLateData(OutputTag<T> lateDataOutputTag) {
+		return new PatternStream<>(builder.withLateDataOutputTag(lateDataOutputTag));
 	}
 
 	/**
@@ -100,16 +80,13 @@ public class PatternStream<T> {
 	 *         function.
 	 */
 	public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction) {
-		// we have to extract the output type from the provided pattern selection function manually
-		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
-
 		final TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
 			patternProcessFunction,
-			PatternSelectFunction.class,
+			PatternProcessFunction.class,
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -132,13 +109,9 @@ public class PatternStream<T> {
 			final PatternProcessFunction<T, R> patternProcessFunction,
 			final TypeInformation<R> outTypeInfo) {
 
-		return PatternStreamBuilder.createPatternStream(
-			inputStream,
-			pattern,
+		return builder.build(
 			outTypeInfo,
-			comparator,
-			lateDataOutputTag,
-			clean(patternProcessFunction));
+			builder.clean(patternProcessFunction));
 	}
 
 	/**
@@ -162,22 +135,13 @@ public class PatternStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
 		return select(patternSelectFunction, returnType);
 	}
 
-	/**
-	 * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
-	 * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
-	 *
-	 * @return The cleaned Function
-	 */
-	private  <F> F clean(F f) {
-		return inputStream.getExecutionEnvironment().clean(f);
-	}
 
 	/**
 	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
@@ -196,7 +160,7 @@ public class PatternStream<T> {
 			final TypeInformation<R> outTypeInfo) {
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromSelect(clean(patternSelectFunction)).build();
+			fromSelect(builder.clean(patternSelectFunction)).build();
 
 		return process(processFunction, outTypeInfo);
 	}
@@ -236,7 +200,7 @@ public class PatternStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -279,8 +243,8 @@ public class PatternStream<T> {
 			final PatternSelectFunction<T, R> patternSelectFunction) {
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromSelect(clean(patternSelectFunction))
-				.withTimeoutHandler(timedOutPartialMatchesTag, clean(patternTimeoutFunction))
+			fromSelect(builder.clean(patternSelectFunction))
+				.withTimeoutHandler(timedOutPartialMatchesTag, builder.clean(patternTimeoutFunction))
 				.build();
 
 		return process(processFunction, outTypeInfo);
@@ -319,7 +283,7 @@ public class PatternStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -329,7 +293,7 @@ public class PatternStream<T> {
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -338,17 +302,17 @@ public class PatternStream<T> {
 		final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timeoutTypeInfo);
 
 		final PatternProcessFunction<T, R> processFunction =
-				fromSelect(clean(patternSelectFunction))
-						.withTimeoutHandler(outputTag, clean(patternTimeoutFunction))
-						.build();
+			fromSelect(builder.clean(patternSelectFunction))
+				.withTimeoutHandler(outputTag, builder.clean(patternTimeoutFunction))
+				.build();
 
 		final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
 		final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
 
 		return mainStream
-				.connect(timedOutStream)
-				.map(new CoMapTimeout<>())
-				.returns(outTypeInfo);
+			.connect(timedOutStream)
+			.map(new CoMapTimeout<>())
+			.returns(outTypeInfo);
 	}
 
 	/**
@@ -362,17 +326,17 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
-	public <R> SingleOutputStreamOperator<R> flatSelect(
-			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+	public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
+
 		final TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[] {1, 0},
-			inputStream.getType(),
+			new int[]{1, 0},
+			builder.getInputType(),
 			null,
 			false);
 
@@ -396,7 +360,8 @@ public class PatternStream<T> {
 			final TypeInformation<R> outTypeInfo) {
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromFlatSelect(clean(patternFlatSelectFunction)).build();
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.build();
 
 		return process(processFunction, outTypeInfo);
 	}
@@ -436,11 +401,12 @@ public class PatternStream<T> {
 			0,
 			1,
 			new int[]{1, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
-		return flatSelect(timedOutPartialMatchesTag,
+		return flatSelect(
+			timedOutPartialMatchesTag,
 			patternFlatTimeoutFunction,
 			rightTypeInfo,
 			patternFlatSelectFunction);
@@ -478,8 +444,8 @@ public class PatternStream<T> {
 			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromFlatSelect(clean(patternFlatSelectFunction))
-				.withTimeoutHandler(timedOutPartialMatchesTag, clean(patternFlatTimeoutFunction))
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.withTimeoutHandler(timedOutPartialMatchesTag, builder.clean(patternFlatTimeoutFunction))
 				.build();
 
 		return process(processFunction, outTypeInfo);
@@ -519,7 +485,7 @@ public class PatternStream<T> {
 			0,
 			1,
 			new int[]{2, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
@@ -529,15 +495,15 @@ public class PatternStream<T> {
 			0,
 			1,
 			new int[]{1, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
 		final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timedOutTypeInfo);
 
 		final PatternProcessFunction<T, R> processFunction =
-			fromFlatSelect(clean(patternFlatSelectFunction))
-				.withTimeoutHandler(outputTag, clean(patternFlatTimeoutFunction))
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.withTimeoutHandler(outputTag, builder.clean(patternFlatTimeoutFunction))
 				.build();
 
 		final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
@@ -545,14 +511,9 @@ public class PatternStream<T> {
 		final TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(timedOutTypeInfo, mainTypeInfo);
 
 		return mainStream
-			.connect(timedOutStream)
-			.map(new CoMapTimeout<>())
-			.returns(outTypeInfo);
-	}
-
-	public PatternStream<T> sideOutputLateData(OutputTag<T> lateDataOutputTag) {
-		this.lateDataOutputTag = clean(lateDataOutputTag);
-		return this;
+				.connect(timedOutStream)
+				.map(new CoMapTimeout<>())
+				.returns(outTypeInfo);
 	}
 
 	/**
@@ -564,12 +525,12 @@ public class PatternStream<T> {
 		private static final long serialVersionUID = 2059391566945212552L;
 
 		@Override
-		public Either<L, R> map1(R value) throws Exception {
+		public Either<L, R> map1(R value) {
 			return Either.Right(value);
 		}
 
 		@Override
-		public Either<L, R> map2(L value) throws Exception {
+		public Either<L, R> map2(L value) {
 			return Either.Left(value);
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
index a2c430a..13d68e2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
@@ -19,6 +19,7 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -45,46 +46,77 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * Utility method for creating {@link PatternStream}.
  */
 @Internal
-final class PatternStreamBuilder {
+final class PatternStreamBuilder<IN> {
+
+	private final DataStream<IN> inputStream;
+
+	private final Pattern<IN, ?> pattern;
+
+	private final EventComparator<IN> comparator;
+
+	/**
+	 * Side output {@code OutputTag} for late data.
+	 * If no tag is set late data will be simply dropped.
+	 */
+	private final OutputTag<IN> lateDataOutputTag;
+
+	private PatternStreamBuilder(
+			final DataStream<IN> inputStream,
+			final Pattern<IN, ?> pattern,
+			@Nullable final EventComparator<IN> comparator,
+			@Nullable final OutputTag<IN> lateDataOutputTag) {
+
+		this.inputStream = checkNotNull(inputStream);
+		this.pattern = checkNotNull(pattern);
+		this.comparator = comparator;
+		this.lateDataOutputTag = lateDataOutputTag;
+	}
+
+	TypeInformation<IN> getInputType() {
+		return inputStream.getType();
+	}
+
+	/**
+	 * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
+	 * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
+	 *
+	 * @return The cleaned Function
+	 */
+	<F> F clean(F f) {
+		return inputStream.getExecutionEnvironment().clean(f);
+	}
+
+	PatternStreamBuilder<IN> withComparator(final EventComparator<IN> comparator) {
+		return new PatternStreamBuilder<>(inputStream, pattern, checkNotNull(comparator), lateDataOutputTag);
+	}
+
+	PatternStreamBuilder<IN> withLateDataOutputTag(final OutputTag<IN> lateDataOutputTag) {
+		return new PatternStreamBuilder<>(inputStream, pattern, comparator, checkNotNull(lateDataOutputTag));
+	}
 
 	/**
 	 * Creates a data stream containing results of {@link PatternProcessFunction} to fully matching event patterns.
 	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
 	 * @param processFunction function to be applied to matching event sequences
 	 * @param outTypeInfo output TypeInformation of
 	 *        {@link PatternProcessFunction#processMatch(Map, PatternProcessFunction.Context, Collector)}
-	 * @param <IN> type of input events
 	 * @param <OUT> type of output events
 	 * @return Data stream containing fully matched event sequence with applied {@link PatternProcessFunction}
 	 */
-	static <IN, OUT, K> SingleOutputStreamOperator<OUT> createPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
+	<OUT, K> SingleOutputStreamOperator<OUT> build(
 			final TypeInformation<OUT> outTypeInfo,
-			@Nullable final EventComparator<IN> comparator,
-			@Nullable final OutputTag<IN> lateDataOutputTag,
 			final PatternProcessFunction<IN, OUT> processFunction) {
 
-		checkNotNull(inputStream);
-		checkNotNull(pattern);
 		checkNotNull(outTypeInfo);
 		checkNotNull(processFunction);
 
-		final TypeSerializer<IN> inputSerializer =
-				inputStream.getType().createSerializer(inputStream.getExecutionConfig());
-
-		// check whether we use processing time
-		final boolean isProcessingTime =
-			inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+		final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
+		final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
 
 		final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
-
-		// compile our pattern into a NFAFactory to instantiate NFAs later on
 		final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
 
-		CepOperator<IN, K, OUT> operator = new CepOperator<>(
+		final CepOperator<IN, K, OUT> operator = new CepOperator<>(
 			inputSerializer,
 			isProcessingTime,
 			nfaFactory,
@@ -108,12 +140,15 @@ final class PatternStreamBuilder {
 				"GlobalCepOperator",
 				outTypeInfo,
 				operator
-				).forceNonParallel();
+			).forceNonParallel();
 		}
 
 		return patternStream;
 	}
 
-	private PatternStreamBuilder() {
+	// ---------------------------------------- factory-like methods ---------------------------------------- //
+
+	static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
+		return new PatternStreamBuilder<>(inputStream, pattern, null, null);
 	}
 }


[flink] 04/08: [hotfix][cep] Added test harness output asserter

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92eb82b35e855482ebe0966fd11af10eb8f8287b
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:33:36 2018 +0100

    [hotfix][cep] Added test harness output asserter
---
 .../org/apache/flink/cep/utils/OutputAsserter.java | 78 ++++++++++++++++++++++
 1 file changed, 78 insertions(+)

diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java
new file mode 100644
index 0000000..63cfa15
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import java.util.Queue;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Asserter for output from {@link OneInputStreamOperatorTestHarness}
+ */
+public class OutputAsserter {
+
+	private final Queue<?> output;
+
+	private OutputAsserter(Queue<?> output) {
+		this.output = output;
+	}
+
+	public static OutputAsserter assertOutput(Queue<?> output) {
+		return new OutputAsserter(output);
+	}
+
+	private AssertionError fail(Object record) {
+		return new AssertionError("Received unexpected element: " + record);
+	}
+
+	public <T> OutputAsserter nextElementEquals(T expected) {
+		final Object record = output.poll();
+		final Object actual;
+		if (record instanceof StreamRecord) {
+			// This is in case we assert side output
+			actual = ((StreamRecord) record).getValue();
+		} else {
+			// This is in case we assert the main output
+			actual = record;
+		}
+		assertThat(actual, is(expected));
+		return this;
+	}
+
+	public void hasNoMoreElements() {
+		assertTrue(output.isEmpty());
+	}
+
+	public OutputAsserter watermarkEquals(long timestamp) {
+		Object record = output.poll();
+		if (record instanceof Watermark) {
+			Watermark watermark = (Watermark) record;
+			assertThat(watermark.getTimestamp(), is(timestamp));
+		} else {
+			throw fail(record);
+		}
+		return this;
+	}
+}


[flink] 01/08: [hotfix][cep] Fixed timestamp for timed out partial matches

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1ccc740dcb588e324c33571df8d7475c09bf962a
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:31:31 2018 +0100

    [hotfix][cep] Fixed timestamp for timed out partial matches
---
 .../flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java         | 2 +-
 .../flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 01dcfd9..190727f 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -249,7 +249,7 @@ public class NFA<T> {
 					Map<String, List<T>> timedOutPattern = sharedBufferAccessor.materializeMatch(extractCurrentMatches(
 						sharedBufferAccessor,
 						computationState));
-					timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
+					timeoutResult.add(Tuple2.of(timedOutPattern, computationState.getStartTimestamp() + windowTime));
 				}
 
 				sharedBufferAccessor.releaseNode(computationState.getPreviousBufferEntry());
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 6ffd5d2..2391d54 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -378,9 +378,9 @@ public class NFAITCase extends TestLogger {
 		timeoutPattern4.put("start", Collections.singletonList(new Event(2, "start", 1.0)));
 
 		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern1, 11L));
-		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 13L));
+		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 12L));
 		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern3, 11L));
-		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 13L));
+		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 12L));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7907391379273505897L;


[flink] 03/08: [hotfix][cep] Added test event builder

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6f5b9c93c50ef019fb491145e0b394de11631aea
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:33:15 2018 +0100

    [hotfix][cep] Added test event builder
---
 .../org/apache/flink/cep/utils/EventBuilder.java   | 72 ++++++++++++++++++++++
 1 file changed, 72 insertions(+)

diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/EventBuilder.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/EventBuilder.java
new file mode 100644
index 0000000..6915de0
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/EventBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Builder for {@link Event} that is used in cep tests.
+ */
+public class EventBuilder {
+
+	private final int id;
+	private final double price;
+	private final String name;
+	private final Long timestamp;
+
+	private EventBuilder(int id, double price, String name, Long timestamp) {
+		this.id = id;
+		this.price = price;
+		this.name = name;
+		this.timestamp = timestamp;
+	}
+
+	public static EventBuilder event() {
+		return new EventBuilder(0, 0.0, "", null);
+	}
+
+	public EventBuilder withId(int id) {
+		return new EventBuilder(id, price, name, timestamp);
+	}
+
+	public EventBuilder withPrice(double price) {
+		return new EventBuilder(id, price, name, timestamp);
+	}
+
+	public EventBuilder withName(String name) {
+		return new EventBuilder(id, price, name, timestamp);
+	}
+
+	public EventBuilder withTimestamp(long timestamp) {
+		return new EventBuilder(id, price, name, timestamp);
+	}
+
+	public StreamRecord<Event> asStreamRecord() {
+		if (timestamp != null) {
+			return new StreamRecord<>(build(), timestamp);
+		} else {
+			return new StreamRecord<>(build());
+		}
+	}
+
+	public Event build() {
+		return new Event(id, name, price);
+	}
+}


[flink] 06/08: [hotfix][cep] Fixed passing runtime context

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bec4a2885102687fd947a31870e5e1a1efb37ba8
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:35:40 2018 +0100

    [hotfix][cep] Fixed passing runtime context
---
 .../flink/cep/RichPatternFlatSelectFunction.java   |  15 +-
 .../flink/cep/RichPatternSelectFunction.java       |  15 +-
 .../functions/RichPatternFlatSelectFunction.java   |  45 ------
 .../cep/functions/RichPatternSelectFunction.java   |  44 ------
 .../main/java/org/apache/flink/cep/nfa/NFA.java    |  34 +++-
 .../org/apache/flink/cep/operator/CepOperator.java |  43 ++---
 .../cep/{ => operator}/CepRuntimeContext.java      |  54 ++++---
 .../pattern/conditions/RichIterativeCondition.java |   8 +-
 .../operator/CepProcessFunctionContextTest.java    |  19 ++-
 .../cep/{ => operator}/CepRuntimeContextTest.java  | 174 +++++++++++----------
 10 files changed, 188 insertions(+), 263 deletions(-)

diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
index a2b89c3..7ac199e 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
@@ -20,9 +20,7 @@ package org.apache.flink.cep;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
@@ -42,16 +40,5 @@ public abstract class RichPatternFlatSelectFunction<IN, OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	@Override
-	public void setRuntimeContext(RuntimeContext runtimeContext) {
-		Preconditions.checkNotNull(runtimeContext);
-
-		if (runtimeContext instanceof CepRuntimeContext) {
-			super.setRuntimeContext(runtimeContext);
-		} else {
-			super.setRuntimeContext(new CepRuntimeContext(runtimeContext));
-		}
-	}
-
-	public abstract void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception;
+	public abstract void flatSelect(final Map<String, List<IN>> pattern, final Collector<OUT> out) throws Exception;
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
index ce694a3..a907e2b 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
@@ -20,8 +20,6 @@ package org.apache.flink.cep;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
@@ -41,16 +39,5 @@ public abstract class RichPatternSelectFunction<IN, OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	@Override
-	public void setRuntimeContext(RuntimeContext runtimeContext) {
-		Preconditions.checkNotNull(runtimeContext);
-
-		if (runtimeContext instanceof CepRuntimeContext) {
-			super.setRuntimeContext(runtimeContext);
-		} else {
-			super.setRuntimeContext(new CepRuntimeContext(runtimeContext));
-		}
-	}
-
-	public abstract OUT select(Map<String, List<IN>> pattern) throws Exception;
+	public abstract OUT select(final Map<String, List<IN>> pattern) throws Exception;
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternFlatSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternFlatSelectFunction.java
deleted file mode 100644
index fdc5640..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternFlatSelectFunction.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.util.Collector;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
- * {@link RichFunction#close()}.
- *
- * @param <IN> Type of the input elements
- * @param <OUT> Type of the output element
- */
-public abstract class RichPatternFlatSelectFunction<IN, OUT>
-		extends AbstractRichFunction
-		implements PatternFlatSelectFunction<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract void flatSelect(final Map<String, List<IN>> pattern, final Collector<OUT> out) throws Exception;
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternSelectFunction.java
deleted file mode 100644
index a4bf327..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternSelectFunction.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.functions;
-
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.cep.PatternSelectFunction;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Rich variant of the {@link PatternSelectFunction}. As a {@link RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
- * {@link RichFunction#close()}.
- *
- * @param <IN> Type of the input elements
- * @param <OUT> Type of the output element
- */
-public abstract class RichPatternSelectFunction<IN, OUT>
-		extends AbstractRichFunction
-		implements PatternSelectFunction<IN, OUT> {
-
-	private static final long serialVersionUID = 1L;
-
-	public abstract OUT select(final Map<String, List<IN>> pattern) throws Exception;
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index b1ddb85..48bb587 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -19,6 +19,8 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
@@ -36,6 +38,7 @@ import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -169,6 +172,34 @@ public class NFA<T> {
 	}
 
 	/**
+	 * Initialization method for the NFA. It is called before any element is passed and thus suitable for one time setup
+	 * work.
+	 * @param cepRuntimeContext runtime context of the enclosing operator
+	 * @param conf The configuration containing the parameters attached to the contract.
+	 */
+	public void open(RuntimeContext cepRuntimeContext, Configuration conf) throws Exception {
+		for (State<T> state : getStates()) {
+			for (StateTransition<T> transition : state.getStateTransitions()) {
+				IterativeCondition condition = transition.getCondition();
+				FunctionUtils.setFunctionRuntimeContext(condition, cepRuntimeContext);
+				FunctionUtils.openFunction(condition, conf);
+			}
+		}
+	}
+
+	/**
+	 * Tear-down method for the NFA.
+	 */
+	public void close() throws Exception {
+		for (State<T> state : getStates()) {
+			for (StateTransition<T> transition : state.getStateTransitions()) {
+				IterativeCondition condition = transition.getCondition();
+				FunctionUtils.closeFunction(condition);
+			}
+		}
+	}
+
+	/**
 	 * Processes the next input event. If some of the computations reach a final state then the
 	 * resulting event sequences are returned. If computations time out and timeout handling is
 	 * activated, then the timed out event patterns are returned.
@@ -224,7 +255,7 @@ public class NFA<T> {
 
 	/**
 	 * Prunes states assuming there will be no events with timestamp <b>lower</b> than the given one.
-	 * It cleares the sharedBuffer and also emits all timed out partial matches.
+	 * It clears the sharedBuffer and also emits all timed out partial matches.
 	 *
 	 * @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
 	 * @param nfaState     The NFAState object that we need to affect while processing
@@ -264,7 +295,6 @@ public class NFA<T> {
 		sharedBufferAccessor.advanceTime(timestamp);
 
 		return timeoutResult;
-
 	}
 
 	private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index 0fd7053..6df1fa0 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -36,25 +36,25 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.NFA.MigratedNFA;
 import org.apache.flink.cep.nfa.NFAState;
 import org.apache.flink.cep.nfa.NFAStateSerializer;
-import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
@@ -127,6 +127,9 @@ public class CepOperator<IN, KEY, OUT>
 	/** Main output collector, that sets a proper timestamp to the StreamRecord. */
 	private transient TimestampedCollector<OUT> collector;
 
+	/** Wrapped RuntimeContext that limits the underlying context features. */
+	private transient CepRuntimeContext cepRuntimeContext;
+
 	public CepOperator(
 			final TypeSerializer<IN> inputSerializer,
 			final boolean isProcessingTime,
@@ -152,6 +155,13 @@ public class CepOperator<IN, KEY, OUT>
 	}
 
 	@Override
+	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+		super.setup(containingTask, config, output);
+		this.cepRuntimeContext = new CepRuntimeContext(getRuntimeContext());
+		FunctionUtils.setFunctionRuntimeContext(getUserFunction(), this.cepRuntimeContext);
+	}
+
+	@Override
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
 
@@ -201,8 +211,8 @@ public class CepOperator<IN, KEY, OUT>
 				VoidNamespaceSerializer.INSTANCE,
 				this);
 
-		this.nfa = nfaFactory.createNFA();
-		openNFA(nfa);
+		nfa = nfaFactory.createNFA();
+		nfa.open(cepRuntimeContext, new Configuration());
 
 		context = new ContextFunctionImpl();
 		collector = new TimestampedCollector<>(output);
@@ -211,8 +221,7 @@ public class CepOperator<IN, KEY, OUT>
 	@Override
 	public void close() throws Exception {
 		super.close();
-
-		closeNFA(nfa);
+		nfa.close();
 	}
 
 	@Override
@@ -498,26 +507,6 @@ public class CepOperator<IN, KEY, OUT>
 		}
 	}
 
-	private void openNFA(NFA<IN> nfa) throws Exception {
-		Configuration conf = new Configuration();
-		for (State<IN> state : nfa.getStates()) {
-			for (StateTransition<IN> transition : state.getStateTransitions()) {
-				IterativeCondition condition = transition.getCondition();
-				FunctionUtils.setFunctionRuntimeContext(condition, getRuntimeContext());
-				FunctionUtils.openFunction(condition, conf);
-			}
-		}
-	}
-
-	private void closeNFA(NFA<IN> nfa) throws Exception {
-		for (State<IN> state : nfa.getStates()) {
-			for (StateTransition<IN> transition : state.getStateTransitions()) {
-				IterativeCondition condition = transition.getCondition();
-				FunctionUtils.closeFunction(condition);
-			}
-		}
-	}
-
 	//////////////////////			Testing Methods			//////////////////////
 
 	@VisibleForTesting
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
similarity index 75%
rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
rename to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
index ed93f8e..0518dad 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep;
+package org.apache.flink.cep.operator;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -45,17 +46,23 @@ import java.io.Serializable;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}.
- * The runtime context only supports basic operations. Consequently, state access, accumulators,
- * broadcast variables and the distributed cache are disabled.
+ * A wrapper class for the {@link RuntimeContext}.
+ *
+ * <p>This context only exposes the functionality needed by the
+ * pattern process function and iterative condition function.
+ * Consequently, state access, accumulators, broadcast variables
+ * and the distributed cache are disabled.
  */
-public class CepRuntimeContext implements RuntimeContext {
+@Internal
+class CepRuntimeContext implements RuntimeContext {
 
 	private final RuntimeContext runtimeContext;
 
-	public CepRuntimeContext(RuntimeContext runtimeContext) {
-		this.runtimeContext = runtimeContext;
+	CepRuntimeContext(final RuntimeContext runtimeContext) {
+		this.runtimeContext = checkNotNull(runtimeContext);
 	}
 
 	@Override
@@ -114,12 +121,13 @@ public class CepRuntimeContext implements RuntimeContext {
 
 	@Override
 	public <V, A extends Serializable> void addAccumulator(
-		String name, Accumulator<V, A> accumulator) {
+			final String name,
+			final Accumulator<V, A> accumulator) {
 		throw new UnsupportedOperationException("Accumulators are not supported.");
 	}
 
 	@Override
-	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
+	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(final String name) {
 		throw new UnsupportedOperationException("Accumulators are not supported.");
 	}
 
@@ -129,68 +137,70 @@ public class CepRuntimeContext implements RuntimeContext {
 	}
 
 	@Override
-	public IntCounter getIntCounter(String name) {
+	public IntCounter getIntCounter(final String name) {
 		throw new UnsupportedOperationException("Int counters are not supported.");
 	}
 
 	@Override
-	public LongCounter getLongCounter(String name) {
+	public LongCounter getLongCounter(final String name) {
 		throw new UnsupportedOperationException("Long counters are not supported.");
 	}
 
 	@Override
-	public DoubleCounter getDoubleCounter(String name) {
+	public DoubleCounter getDoubleCounter(final String name) {
 		throw new UnsupportedOperationException("Double counters are not supported.");
 	}
 
 	@Override
-	public Histogram getHistogram(String name) {
+	public Histogram getHistogram(final String name) {
 		throw new UnsupportedOperationException("Histograms are not supported.");
 	}
 
 	@Override
-	public boolean hasBroadcastVariable(String name) {
+	public boolean hasBroadcastVariable(final String name) {
 		throw new UnsupportedOperationException("Broadcast variables are not supported.");
 	}
 
 	@Override
-	public <RT> List<RT> getBroadcastVariable(String name) {
+	public <RT> List<RT> getBroadcastVariable(final String name) {
 		throw new UnsupportedOperationException("Broadcast variables are not supported.");
 	}
 
 	@Override
 	public <T, C> C getBroadcastVariableWithInitializer(
-		String name, BroadcastVariableInitializer<T, C> initializer) {
+			final String name,
+			final BroadcastVariableInitializer<T, C> initializer) {
 		throw new UnsupportedOperationException("Broadcast variables are not supported.");
 	}
 
 	@Override
-	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+	public <T> ValueState<T> getState(final ValueStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+	public <T> ListState<T> getListState(final ListStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+	public <T> ReducingState<T> getReducingState(final ReducingStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+	public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
+			final AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+	public <T, ACC> FoldingState<T, ACC> getFoldingState(final FoldingStateDescriptor<T, ACC> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+	public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, UV> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
index 12f017e..da20321 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
@@ -21,7 +21,6 @@ package org.apache.flink.cep.pattern.conditions;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.cep.CepRuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 
@@ -46,12 +45,7 @@ public abstract class RichIterativeCondition<T>
 	@Override
 	public void setRuntimeContext(RuntimeContext runtimeContext) {
 		Preconditions.checkNotNull(runtimeContext);
-
-		if (runtimeContext instanceof CepRuntimeContext) {
-			this.runtimeContext = runtimeContext;
-		} else {
-			this.runtimeContext = new CepRuntimeContext(runtimeContext);
-		}
+		this.runtimeContext = runtimeContext;
 	}
 
 	@Override
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
index f35d08e..0947748 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
@@ -45,6 +45,9 @@ import static org.apache.flink.cep.utils.OutputAsserter.assertOutput;
  */
 public class CepProcessFunctionContextTest extends TestLogger {
 
+	private static final boolean PROCESSING_TIME = false;
+	private static final boolean EVENT_TIME = true;
+
 	@Test
 	public void testTimestampPassingInEventTime() throws Exception {
 
@@ -53,7 +56,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(1),
 					new NFAForwardingFactory(),
-					false))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -78,7 +81,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(1),
 					new NFAForwardingFactory(),
-					true))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
@@ -99,7 +102,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(1),
 					new NFAForwardingFactory(),
-					true))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(15);
@@ -122,7 +125,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(1),
 					new NFAForwardingFactory(),
-					false))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(10);
@@ -147,7 +150,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(2, timedOut),
 					new NFATimingOutFactory(),
-					false))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -178,7 +181,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractTimestampAndNames(2, timedOut),
 					new NFATimingOutFactory(),
-					true))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(3);
@@ -208,7 +211,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
 					new NFATimingOutFactory(),
-					false))) {
+					PROCESSING_TIME))) {
 			harness.open();
 
 			// events out of order to test if internal sorting does not mess up the timestamps
@@ -240,7 +243,7 @@ public class CepProcessFunctionContextTest extends TestLogger {
 				createCepOperator(
 					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
 					new NFATimingOutFactory(),
-					true))) {
+					EVENT_TIME))) {
 			harness.open();
 
 			harness.setProcessingTime(3);
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
similarity index 66%
rename from flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
index ef7ee898..b77dd94 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep;
+package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.AggregateFunction;
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -33,100 +32,84 @@ import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.cep.pattern.conditions.RichAndCondition;
-import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
-import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
-import org.apache.flink.cep.pattern.conditions.RichNotCondition;
-import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.cep.operator.CepOperatorTestUtilities.getCepTestHarness;
+import static org.apache.flink.cep.operator.CepRuntimeContextTest.MockProcessFunctionAsserter.assertFunction;
+import static org.apache.flink.cep.utils.CepOperatorBuilder.createOperatorForNFA;
+import static org.apache.flink.cep.utils.EventBuilder.event;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
  * Test cases for {@link CepRuntimeContext}.
  */
-public class CepRuntimeContextTest {
+public class CepRuntimeContextTest extends TestLogger {
 
 	@Test
-	public void testRichCompositeIterativeCondition() throws Exception {
-		RichIterativeCondition<Integer> first = new TestRichIterativeCondition();
-		RichIterativeCondition<Integer> second = new TestRichIterativeCondition();
-		RichIterativeCondition<Integer> third = new TestRichIterativeCondition();
-
-		RichCompositeIterativeCondition function = new RichCompositeIterativeCondition(first, second, third) {
-			@Override
-			public boolean filter(Object value, Context ctx) throws Exception {
-				return false;
-			}
-		};
-		function.setRuntimeContext(mock(RuntimeContext.class));
-
-		assertTrue(first.getRuntimeContext() instanceof CepRuntimeContext);
-		assertTrue(second.getRuntimeContext() instanceof CepRuntimeContext);
-		assertTrue(third.getRuntimeContext() instanceof CepRuntimeContext);
-	}
+	public void testCepRuntimeContextIsSetInNFA() throws Exception {
 
-	@Test
-	public void testRichAndCondition() throws Exception {
-		RichIterativeCondition<Integer> left = new TestRichIterativeCondition();
-		RichIterativeCondition<Integer> right = new TestRichIterativeCondition();
+		@SuppressWarnings("unchecked")
+		final NFA<Event> mockNFA = mock(NFA.class);
 
-		RichAndCondition function = new RichAndCondition<>(left, right);
-		function.setRuntimeContext(mock(RuntimeContext.class));
+		try (
+			OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(
+				createOperatorForNFA(mockNFA).build())) {
 
-		assertTrue(left.getRuntimeContext() instanceof CepRuntimeContext);
-		assertTrue(right.getRuntimeContext() instanceof CepRuntimeContext);
+			harness.open();
+			verify(mockNFA).open(any(CepRuntimeContext.class), any(Configuration.class));
+		}
 	}
 
 	@Test
-	public void testRichOrCondition() throws Exception {
-		RichIterativeCondition<Integer> left = new TestRichIterativeCondition();
-		RichIterativeCondition<Integer> right = new TestRichIterativeCondition();
-
-		RichOrCondition function = new RichOrCondition<>(left, right);
-		function.setRuntimeContext(mock(RuntimeContext.class));
-
-		assertTrue(left.getRuntimeContext() instanceof CepRuntimeContext);
-		assertTrue(right.getRuntimeContext() instanceof CepRuntimeContext);
-	}
+	public void testCepRuntimeContextIsSetInProcessFunction() throws Exception {
 
-	@Test
-	public void testRichNotCondition() {
-		RichIterativeCondition<Integer> original = new TestRichIterativeCondition();
+		final VerifyRuntimeContextProcessFunction processFunction = new VerifyRuntimeContextProcessFunction();
 
-		RichNotCondition function = new RichNotCondition<>(original);
-		function.setRuntimeContext(mock(RuntimeContext.class));
+		try (
+			OneInputStreamOperatorTestHarness<Event, Event> harness = getCepTestHarness(
+				createOperatorForNFA(getSingleElementAlwaysTrueNFA())
+					.withFunction(processFunction)
+					.build())) {
 
-		assertTrue(original.getRuntimeContext() instanceof CepRuntimeContext);
-	}
+			harness.open();
+			Event record = event().withName("A").build();
+			harness.processElement(record, 0);
 
-	@Test
-	public void testRichPatternSelectFunction() {
-		verifyRuntimeContext(new TestRichPatternSelectFunction());
+			assertFunction(processFunction)
+				.checkOpenCalled()
+				.checkCloseCalled()
+				.checkProcessMatchCalled();
+		}
 	}
 
-	@Test
-	public void testRichPatternFlatSelectFunction() {
-		verifyRuntimeContext(new TestRichPatternFlatSelectFunction());
+	private NFA<Event> getSingleElementAlwaysTrueNFA() {
+		return NFACompiler.compileFactory(Pattern.<Event>begin("A"), false).createNFA();
 	}
 
 	@Test
-	public void testRichIterativeCondition() {
-		verifyRuntimeContext(new TestRichIterativeCondition());
-	}
-
-	private void verifyRuntimeContext(final RichFunction function) {
+	public void testCepRuntimeContext() {
 		final String taskName = "foobarTask";
 		final MetricGroup metricGroup = new UnregisteredMetricsGroup();
 		final int numberOfParallelSubtasks = 42;
@@ -149,11 +132,8 @@ public class CepRuntimeContextTest {
 		when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
 		when(mockedRuntimeContext.getDistributedCache()).thenReturn(distributedCache);
 
-		function.setRuntimeContext(mockedRuntimeContext);
-
-		RuntimeContext runtimeContext = function.getRuntimeContext();
+		RuntimeContext runtimeContext = new CepRuntimeContext(mockedRuntimeContext);
 
-		assertTrue(runtimeContext instanceof CepRuntimeContext);
 		assertEquals(taskName, runtimeContext.getTaskName());
 		assertEquals(metricGroup, runtimeContext.getMetricGroup());
 		assertEquals(numberOfParallelSubtasks, runtimeContext.getNumberOfParallelSubtasks());
@@ -289,30 +269,64 @@ public class CepRuntimeContextTest {
 		}
 	}
 
-	private static class TestRichIterativeCondition extends RichIterativeCondition<Integer> {
-		private static final long serialVersionUID = 1L;
+	/* Test Utils */
+	static class MockProcessFunctionAsserter {
+		private final VerifyRuntimeContextProcessFunction function;
 
-		@Override
-		public boolean filter(Integer value, Context<Integer> ctx) throws Exception {
-			return false;
+		static MockProcessFunctionAsserter assertFunction(VerifyRuntimeContextProcessFunction function) {
+			return new MockProcessFunctionAsserter(function);
+		}
+
+		private MockProcessFunctionAsserter(VerifyRuntimeContextProcessFunction function) {
+			this.function = function;
+		}
+
+		MockProcessFunctionAsserter checkOpenCalled() {
+			assertThat(function.openCalled, is(true));
+			return this;
+		}
+
+		MockProcessFunctionAsserter checkCloseCalled() {
+			assertThat(function.openCalled, is(true));
+			return this;
+		}
+
+		MockProcessFunctionAsserter checkProcessMatchCalled() {
+			assertThat(function.processMatchCalled, is(true));
+			return this;
 		}
 	}
 
-	private static class TestRichPatternSelectFunction extends RichPatternSelectFunction<Integer, Integer> {
-		private static final long serialVersionUID = 1L;
+	private static class VerifyRuntimeContextProcessFunction extends PatternProcessFunction<Event, Event> {
+
+		boolean openCalled = false;
+		boolean closeCalled = false;
+		boolean processMatchCalled = false;
 
 		@Override
-		public Integer select(Map<String, List<Integer>> pattern) throws Exception {
-			return null;
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			verifyContext();
+			openCalled = true;
+		}
+
+		private void verifyContext() {
+			if (!(getRuntimeContext() instanceof CepRuntimeContext)) {
+				fail("Runtime context was not wrapped in CepRuntimeContext");
+			}
 		}
-	}
 
-	private static class TestRichPatternFlatSelectFunction extends RichPatternFlatSelectFunction<Integer, Integer> {
-		private static final long serialVersionUID = 1L;
+		@Override
+		public void close() throws Exception {
+			super.close();
+			verifyContext();
+			closeCalled = true;
+		}
 
 		@Override
-		public void flatSelect(Map<String, List<Integer>> pattern, Collector<Integer> out) throws Exception {
-			// no op
+		public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Event> out) throws Exception {
+			verifyContext();
+			processMatchCalled = true;
 		}
 	}
 }


[flink] 05/08: [FLINK-10596][cep] Introduced PatternProcessFunction

Posted by dw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 44968096046a316c6682ef84d12b9c60d7725df7
Author: Dawid Wysakowicz <dw...@apache.org>
AuthorDate: Tue Dec 18 09:29:02 2018 +0100

    [FLINK-10596][cep] Introduced PatternProcessFunction
---
 .../org/apache/flink/cep/scala/PatternStream.scala |  17 +-
 ...ernStreamScalaJavaAPIInteroperabilityTest.scala |  95 +++--
 .../flink/cep/PatternProcessFunctionBuilder.java   | 147 +++++++
 .../java/org/apache/flink/cep/PatternStream.java   | 229 ++++++-----
 .../org/apache/flink/cep/PatternStreamBuilder.java | 119 ++++++
 .../org/apache/flink/cep/context/TimerContext.java |  43 +++
 .../cep/functions/PatternProcessFunction.java      |  76 ++++
 .../functions/RichPatternFlatSelectFunction.java   |  45 +++
 .../cep/functions/RichPatternSelectFunction.java   |  44 +++
 .../cep/functions/TimedOutPartialMatchHandler.java |  81 ++++
 .../adaptors/PatternFlatSelectAdapter.java         |  63 +++
 .../functions/adaptors/PatternSelectAdapter.java   |  63 +++
 .../adaptors/PatternTimeoutFlatSelectAdapter.java  | 111 ++++++
 .../adaptors/PatternTimeoutSelectAdapter.java      |  79 ++++
 .../main/java/org/apache/flink/cep/nfa/NFA.java    |   3 +-
 .../flink/cep/operator/CEPOperatorUtils.java       | 319 ----------------
 ...yedCEPPatternOperator.java => CepOperator.java} | 140 +++++--
 .../flink/cep/operator/FlatSelectCepOperator.java  |  69 ----
 .../cep/operator/FlatSelectTimeoutCepOperator.java | 131 -------
 .../flink/cep/operator/SelectCepOperator.java      |  58 ---
 .../cep/operator/SelectTimeoutCepOperator.java     | 120 ------
 .../operator/TimestampedSideOutputCollector.java   |  82 ----
 .../apache/flink/cep/operator/CEPOperatorTest.java |  92 +++--
 .../cep/operator/CepOperatorTestUtilities.java     |  29 +-
 .../operator/CepProcessFunctionContextTest.java    | 425 +++++++++++++++++++++
 .../apache/flink/cep/utils/CepOperatorBuilder.java | 192 ++++++++++
 .../org/apache/flink/cep/utils/OutputAsserter.java |   2 +-
 27 files changed, 1886 insertions(+), 988 deletions(-)

diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index 44f80af..dd17ea6 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -20,14 +20,13 @@ package org.apache.flink.cep.scala
 import java.util.{UUID, List => JList, Map => JMap}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.cep.functions.PatternProcessFunction
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
 import org.apache.flink.cep.scala.pattern.Pattern
 import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
 import org.apache.flink.util.Collector
 
-import org.apache.flink.cep.operator.CEPOperatorUtils
-import org.apache.flink.cep.scala.pattern.Pattern
 import scala.collection.Map
 
 /**
@@ -50,6 +49,20 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   def getComparator: EventComparator[T] = jPatternStream.getComparator
 
   /**
+    * Applies a process function to the detected pattern sequence. For each pattern sequence the
+    * provided [[PatternProcessFunction]] is called.
+    *
+    * @param patternProcessFunction The pattern process function which is called for each detected
+    *                              pattern sequence.
+    * @tparam R Type of the resulting elements
+    * @return [[DataStream]] which contains the resulting elements from the pattern select function.
+    */
+  def process[R: TypeInformation](patternProcessFunction: PatternProcessFunction[T, R])
+  : DataStream[R] = {
+    asScalaStream(jPatternStream.process(patternProcessFunction, implicitly[TypeInformation[R]]))
+  }
+
+  /**
     * Applies a select function to the detected pattern sequence. For each pattern sequence the
     * provided [[PatternSelectFunction]] is called. The pattern select function can produce
     * exactly one resulting element.
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
index f3371c8..5f6055d 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -17,31 +17,31 @@
  */
 package org.apache.flink.cep.scala
 
-import org.apache.flink.api.common.functions.util.ListCollector
+import java.lang
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.common.functions.util.{FunctionUtils, ListCollector}
+import org.apache.flink.cep.functions.{PatternProcessFunction, TimedOutPartialMatchHandler}
+import org.apache.flink.cep.operator.CepOperator
 import org.apache.flink.cep.scala.pattern.Pattern
-import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.transformations.{OneInputTransformation, TwoInputTransformation}
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.util
 import org.apache.flink.util.{Collector, TestLogger}
-import org.apache.flink.types.{Either => FEither}
-import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
-import java.lang.{Long => JLong}
-import java.util.{Map => JMap}
-import java.util.{List => JList}
-
-import org.apache.flink.cep.operator.{FlatSelectCepOperator, FlatSelectTimeoutCepOperator, SelectCepOperator}
-import org.apache.flink.streaming.api.functions.co.CoMapFunction
-
-import scala.collection.JavaConverters._
-import scala.collection.Map
 import org.junit.Assert._
 import org.junit.Test
+import org.mockito.Mockito
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.collection.{Map, mutable}
 
 class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
 
   @Test
   @throws[Exception]
-  def testScalaJavaAPISelectFunForwarding {
+  def testScalaJavaAPISelectFunForwarding() {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
     val pattern: Pattern[(Int, Int), (Int, Int)] = Pattern.begin[(Int, Int)]("dummy")
@@ -51,17 +51,21 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
       .select((pattern: Map[String, Iterable[(Int, Int)]]) => {
         //verifies input parameter forwarding
         assertEquals(param, pattern)
-        param.get("begin").get(0)
+        param("begin").head
       })
-    val out = extractUserFunction[SelectCepOperator[(Int, Int), Byte, (Int, Int)]](result)
-      .getUserFunction.select(param.mapValues(_.asJava).asJava)
+
+    val outList = new java.util.ArrayList[(Int, Int)]
+    val outParam = new ListCollector[(Int, Int)](outList)
+
+    val fun = extractFun[(Int, Int), (Int, Int)](result)
+    fun.processMatch(param.mapValues(_.asJava).asJava, new ListTestContext, outParam)
     //verifies output parameter forwarding
-    assertEquals(param.get("begin").get(0), out)
+    assertEquals(param("begin").head, outList.get(0))
   }
 
   @Test
   @throws[Exception]
-  def testScalaJavaAPIFlatSelectFunForwarding {
+  def testScalaJavaAPIFlatSelectFunForwarding() {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val dummyDataStream: DataStream[List[Int]] = env.fromElements()
     val pattern: Pattern[List[Int], List[Int]] = Pattern.begin[List[Int]]("dummy")
@@ -76,18 +80,18 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
       .flatSelect((pattern: Map[String, Iterable[List[Int]]], out: Collector[List[Int]]) => {
         //verifies input parameter forwarding
         assertEquals(inParam, pattern)
-        out.collect(pattern.get("begin").get.head)
+        out.collect(pattern("begin").head)
       })
 
-    extractUserFunction[FlatSelectCepOperator[List[Int], Byte, List[Int]]](result).
-      getUserFunction.flatSelect(inParam.mapValues(_.asJava).asJava, outParam)
+    val fun = extractFun[List[Int], List[Int]](result)
+    fun.processMatch(inParam.mapValues(_.asJava).asJava, new ListTestContext, outParam)
     //verify output parameter forwarding and that flatMap function was actually called
     assertEquals(inList, outList.get(0))
   }
 
   @Test
   @throws[Exception]
-  def testTimeoutHandling: Unit = {
+  def testTimeoutHandling(): Unit = {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val dummyDataStream: DataStream[String] = env.fromElements()
     val pattern: Pattern[String, String] = Pattern.begin[String]("dummy")
@@ -95,8 +99,6 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
     val inParam = Map("begin" -> List("barfoo"))
     val outList = new java.util.ArrayList[Either[String, String]]
     val output = new ListCollector[Either[String, String]](outList)
-    val expectedOutput = List(Right("match"), Right("barfoo"), Left("timeout"), Left("barfoo"))
-      .asJava
 
     val outputTag = OutputTag[Either[String, String]]("timeouted")
     val result: DataStream[Either[String, String]] = pStream.flatSelect(outputTag) {
@@ -112,22 +114,45 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
           out.collect(Right(pattern("begin").head))
       }
 
-    val fun = extractUserFunction[FlatSelectTimeoutCepOperator[String, Either[String, String],
-      Either[String, String], Byte]](
-      result).getUserFunction
+    val fun = extractFun[String, Either[String, String]](result)
 
-    fun.getFlatSelectFunction.flatSelect(inParam.mapValues(_.asJava).asJava, output)
-    fun.getFlatTimeoutFunction.timeout(inParam.mapValues(_.asJava).asJava, 42L, output)
+    val ctx = new ListTestContext
+    fun.processMatch(inParam.mapValues(_.asJava).asJava, ctx, output)
+    fun.asInstanceOf[TimedOutPartialMatchHandler[String]]
+      .processTimedOutMatch(inParam.mapValues(_.asJava).asJava, ctx)
 
-    assertEquals(expectedOutput, outList)
+    assertEquals(List(Right("match"), Right("barfoo")).asJava, outList)
+    assertEquals(List(Left("timeout"), Left("barfoo")).asJava, ctx.getElements(outputTag).asJava)
   }
 
-  def extractUserFunction[T](dataStream: DataStream[_]) = {
-    dataStream.javaStream
+  def extractFun[IN, OUT](dataStream: DataStream[OUT]): PatternProcessFunction[IN, OUT] = {
+    val oper = dataStream.javaStream
       .getTransformation
       .asInstanceOf[OneInputTransformation[_, _]]
       .getOperator
-      .asInstanceOf[T]
+      .asInstanceOf[CepOperator[IN, Byte, OUT]]
+
+    val fun = oper.getUserFunction
+    FunctionUtils.setFunctionRuntimeContext(fun, Mockito.mock(classOf[RuntimeContext]))
+    FunctionUtils.openFunction(fun, new Configuration())
+    fun
+  }
+
+  class ListTestContext extends PatternProcessFunction.Context {
+
+    private val outputs = new mutable.HashMap[util.OutputTag[_], mutable.ListBuffer[Any]]()
+
+    def getElements(outputTag: OutputTag[_]): ListBuffer[Any] = {
+      outputs.getOrElse(outputTag, ListBuffer.empty)
+    }
+
+    override def output[X](outputTag: util.OutputTag[X], value: X): Unit = {
+      outputs.getOrElseUpdate(outputTag, ListBuffer.empty).append(value)
+    }
+
+    override def timestamp(): lang.Long = null
+
+    override def currentProcessingTime(): Long = System.currentTimeMillis()
   }
 
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternProcessFunctionBuilder.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternProcessFunctionBuilder.java
new file mode 100644
index 0000000..abdd061
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternProcessFunctionBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.adaptors.PatternFlatSelectAdapter;
+import org.apache.flink.cep.functions.adaptors.PatternSelectAdapter;
+import org.apache.flink.cep.functions.adaptors.PatternTimeoutFlatSelectAdapter;
+import org.apache.flink.cep.functions.adaptors.PatternTimeoutSelectAdapter;
+import org.apache.flink.util.OutputTag;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Builder for adapting pre-1.8 functions like {@link PatternFlatSelectFunction}, {@link PatternFlatTimeoutFunction}
+ * into {@link PatternProcessFunction}.
+ */
+@Internal
+class PatternProcessFunctionBuilder {
+
+	/**
+	 * Starts constructing a {@link PatternProcessFunction} from a {@link PatternFlatSelectFunction} that
+	 * emitted elements through {@link org.apache.flink.util.Collector}.
+	 */
+	static <IN, OUT> FlatSelectBuilder<IN, OUT> fromFlatSelect(final PatternFlatSelectFunction<IN, OUT> function) {
+		return new FlatSelectBuilder<>(function);
+	}
+
+	/**
+	 * Starts constructing a {@link PatternProcessFunction} from a {@link PatternSelectFunction} that
+	 * emitted elements through return value.
+	 */
+	static <IN, OUT> SelectBuilder<IN, OUT> fromSelect(final PatternSelectFunction<IN, OUT> function) {
+		return new SelectBuilder<>(function);
+	}
+
+	/**
+	 * Wraps {@link PatternFlatSelectFunction} in a builder. The builder can construct a
+	 * 	 * {@link PatternProcessFunction} adapter.
+	 */
+	static class FlatSelectBuilder<IN, OUT> {
+
+		private final PatternFlatSelectFunction<IN, OUT> flatSelectFunction;
+
+		FlatSelectBuilder(PatternFlatSelectFunction<IN, OUT> function) {
+			this.flatSelectFunction = checkNotNull(function);
+		}
+
+		<TIMED_OUT> FlatTimeoutSelectBuilder<IN, OUT, TIMED_OUT> withTimeoutHandler(
+				final OutputTag<TIMED_OUT> outputTag,
+				final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler) {
+			return new FlatTimeoutSelectBuilder<>(flatSelectFunction, timeoutHandler, outputTag);
+		}
+
+		PatternProcessFunction<IN, OUT> build() {
+			return new PatternFlatSelectAdapter<>(flatSelectFunction);
+		}
+	}
+
+	/**
+	 * Wraps {@link PatternFlatSelectFunction} and {@link PatternFlatTimeoutFunction} in a builder. The builder will
+	 * create a {@link PatternProcessFunction} adapter that handles timed out partial matches as well.
+	 */
+	static class FlatTimeoutSelectBuilder<IN, OUT, TIMED_OUT> {
+		private final PatternFlatSelectFunction<IN, OUT> flatSelectFunction;
+
+		private final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler;
+		private final OutputTag<TIMED_OUT> outputTag;
+
+		FlatTimeoutSelectBuilder(
+				final PatternFlatSelectFunction<IN, OUT> flatSelectFunction,
+				final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler,
+				final OutputTag<TIMED_OUT> outputTag) {
+			this.flatSelectFunction = checkNotNull(flatSelectFunction);
+			this.timeoutHandler = checkNotNull(timeoutHandler);
+			this.outputTag = checkNotNull(outputTag);
+		}
+
+		PatternProcessFunction<IN, OUT> build() {
+			return new PatternTimeoutFlatSelectAdapter<>(flatSelectFunction, timeoutHandler, outputTag);
+		}
+	}
+
+	/**
+	 * Wraps {@link PatternSelectFunction} in a builder. The builder can construct a
+	 * {@link PatternProcessFunction} adapter.
+	 */
+	static class SelectBuilder<IN, OUT> {
+
+		private final PatternSelectFunction<IN, OUT> selectFunction;
+
+		SelectBuilder(PatternSelectFunction<IN, OUT> function) {
+			this.selectFunction = checkNotNull(function);
+		}
+
+		<TIMED_OUT> TimeoutSelectBuilder<IN, OUT, TIMED_OUT> withTimeoutHandler(
+				final OutputTag<TIMED_OUT> outputTag,
+				final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler) {
+			return new TimeoutSelectBuilder<>(selectFunction, timeoutHandler, outputTag);
+		}
+
+		PatternProcessFunction<IN, OUT> build() {
+			return new PatternSelectAdapter<>(selectFunction);
+		}
+	}
+
+	/**
+	 * Wraps {@link PatternSelectFunction} and {@link PatternTimeoutFunction} in a builder. The builder will create a
+	 * {@link PatternProcessFunction} adapter that handles timed out partial matches as well.
+	 */
+	static class TimeoutSelectBuilder<IN, OUT, TIMED_OUT> {
+		private final PatternSelectFunction<IN, OUT> selectFunction;
+
+		private final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler;
+		private final OutputTag<TIMED_OUT> outputTag;
+
+		TimeoutSelectBuilder(
+				final PatternSelectFunction<IN, OUT> flatSelectFunction,
+				final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler,
+				final OutputTag<TIMED_OUT> outputTag) {
+			this.selectFunction = checkNotNull(flatSelectFunction);
+			this.timeoutHandler = checkNotNull(timeoutHandler);
+			this.outputTag = checkNotNull(outputTag);
+		}
+
+		PatternProcessFunction<IN, OUT> build() {
+			return new PatternTimeoutSelectAdapter<>(selectFunction, timeoutHandler, outputTag);
+		}
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 521665f..40508fe 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -23,7 +23,8 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.cep.operator.CEPOperatorUtils;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -33,6 +34,9 @@ import org.apache.flink.util.OutputTag;
 
 import java.util.UUID;
 
+import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromFlatSelect;
+import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
+
 /**
  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
  * pattern sequences as a map of events associated with their names. The pattern is detected using a
@@ -85,6 +89,59 @@ public class PatternStream<T> {
 	}
 
 	/**
+	 * Applies a process function to the detected pattern sequence. For each pattern sequence the
+	 * provided {@link PatternProcessFunction} is called. In order to process timed out partial matches as well one can
+	 * use {@link TimedOutPartialMatchHandler} as additional interface.
+	 *
+	 * @param patternProcessFunction The pattern process function which is called for each detected
+	 *                               pattern sequence.
+	 * @param <R> Type of the resulting elements
+	 * @return {@link DataStream} which contains the resulting elements from the pattern process
+	 *         function.
+	 */
+	public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction) {
+		// we have to extract the output type from the provided pattern selection function manually
+		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
+
+		final TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
+			patternProcessFunction,
+			PatternSelectFunction.class,
+			0,
+			1,
+			TypeExtractor.NO_INDEX,
+			inputStream.getType(),
+			null,
+			false);
+
+		return process(patternProcessFunction, returnType);
+	}
+
+	/**
+	 * Applies a process function to the detected pattern sequence. For each pattern sequence the
+	 * provided {@link PatternProcessFunction} is called. In order to process timed out partial matches as well one can
+	 * use {@link TimedOutPartialMatchHandler} as additional interface.
+	 *
+	 * @param patternProcessFunction The pattern process function which is called for each detected
+	 *                              pattern sequence.
+	 * @param <R> Type of the resulting elements
+	 * @param outTypeInfo Explicit specification of output type.
+	 * @return {@link DataStream} which contains the resulting elements from the pattern process
+	 *         function.
+	 */
+	public <R> SingleOutputStreamOperator<R> process(
+			final PatternProcessFunction<T, R> patternProcessFunction,
+			final TypeInformation<R> outTypeInfo) {
+
+		return PatternStreamBuilder.createPatternStream(
+			inputStream,
+			pattern,
+			outTypeInfo,
+			comparator,
+			lateDataOutputTag,
+			clean(patternProcessFunction));
+	}
+
+	/**
 	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
 	 * provided {@link PatternSelectFunction} is called. The pattern select function can produce
 	 * exactly one resulting element.
@@ -99,7 +156,7 @@ public class PatternStream<T> {
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
 
-		TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
 			0,
@@ -134,8 +191,14 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements from the pattern select
 	 *         function.
 	 */
-	public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
-		return CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator, clean(patternSelectFunction), outTypeInfo, lateDataOutputTag);
+	public <R> SingleOutputStreamOperator<R> select(
+			final PatternSelectFunction<T, R> patternSelectFunction,
+			final TypeInformation<R> outTypeInfo) {
+
+		final PatternProcessFunction<T, R> processFunction =
+			fromSelect(clean(patternSelectFunction)).build();
+
+		return process(processFunction, outTypeInfo);
 	}
 
 	/**
@@ -152,7 +215,7 @@ public class PatternStream<T> {
 	 * {@link SingleOutputStreamOperator} resulting from the select operation
 	 * with the same {@link OutputTag}.
 	 *
-	 * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
+	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
 	 * @param patternTimeoutFunction The pattern timeout function which is called for each partial
 	 *                               pattern sequence which has timed out.
 	 * @param patternSelectFunction The pattern select function which is called for each detected
@@ -163,11 +226,11 @@ public class PatternStream<T> {
 	 * elements in a side output.
 	 */
 	public <L, R> SingleOutputStreamOperator<R> select(
-		final OutputTag<L> timeoutOutputTag,
-		final PatternTimeoutFunction<T, L> patternTimeoutFunction,
-		final PatternSelectFunction<T, R> patternSelectFunction) {
+			final OutputTag<L> timedOutPartialMatchesTag,
+			final PatternTimeoutFunction<T, L> patternTimeoutFunction,
+			final PatternSelectFunction<T, R> patternSelectFunction) {
 
-		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
 			0,
@@ -178,7 +241,7 @@ public class PatternStream<T> {
 			false);
 
 		return select(
-			timeoutOutputTag,
+			timedOutPartialMatchesTag,
 			patternTimeoutFunction,
 			rightTypeInfo,
 			patternSelectFunction);
@@ -198,7 +261,7 @@ public class PatternStream<T> {
 	 * {@link SingleOutputStreamOperator} resulting from the select operation
 	 * with the same {@link OutputTag}.
 	 *
-	 * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
+	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
 	 * @param patternTimeoutFunction The pattern timeout function which is called for each partial
 	 *                               pattern sequence which has timed out.
 	 * @param outTypeInfo Explicit specification of output type.
@@ -210,19 +273,17 @@ public class PatternStream<T> {
 	 * elements in a side output.
 	 */
 	public <L, R> SingleOutputStreamOperator<R> select(
-			final OutputTag<L> timeoutOutputTag,
+			final OutputTag<L> timedOutPartialMatchesTag,
 			final PatternTimeoutFunction<T, L> patternTimeoutFunction,
 			final TypeInformation<R> outTypeInfo,
 			final PatternSelectFunction<T, R> patternSelectFunction) {
-		return CEPOperatorUtils.createTimeoutPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternSelectFunction),
-			outTypeInfo,
-			timeoutOutputTag,
-			clean(patternTimeoutFunction),
-			lateDataOutputTag);
+
+		final PatternProcessFunction<T, R> processFunction =
+			fromSelect(clean(patternSelectFunction))
+				.withTimeoutHandler(timedOutPartialMatchesTag, clean(patternTimeoutFunction))
+				.build();
+
+		return process(processFunction, outTypeInfo);
 	}
 
 	/**
@@ -249,10 +310,10 @@ public class PatternStream<T> {
 	 */
 	@Deprecated
 	public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
-		final PatternTimeoutFunction<T, L> patternTimeoutFunction,
-		final PatternSelectFunction<T, R> patternSelectFunction) {
+			final PatternTimeoutFunction<T, L> patternTimeoutFunction,
+			final PatternSelectFunction<T, R> patternSelectFunction) {
 
-		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> mainTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
 			0,
@@ -262,7 +323,7 @@ public class PatternStream<T> {
 			null,
 			false);
 
-		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<L> timeoutTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternTimeoutFunction,
 			PatternTimeoutFunction.class,
 			0,
@@ -272,23 +333,22 @@ public class PatternStream<T> {
 			null,
 			false);
 
-		final OutputTag<L> outputTag = new OutputTag<L>(UUID.randomUUID().toString(), leftTypeInfo);
+		final TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(timeoutTypeInfo, mainTypeInfo);
 
-		final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternSelectFunction),
-			rightTypeInfo,
-			outputTag,
-			clean(patternTimeoutFunction),
-			lateDataOutputTag);
+		final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timeoutTypeInfo);
 
-		final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
+		final PatternProcessFunction<T, R> processFunction =
+				fromSelect(clean(patternSelectFunction))
+						.withTimeoutHandler(outputTag, clean(patternTimeoutFunction))
+						.build();
 
-		TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
+		final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
+		final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
 
-		return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
+		return mainStream
+				.connect(timedOutStream)
+				.map(new CoMapTimeout<>())
+				.returns(outTypeInfo);
 	}
 
 	/**
@@ -302,10 +362,11 @@ public class PatternStream<T> {
 	 * @return {@link DataStream} which contains the resulting elements from the pattern flat select
 	 *         function.
 	 */
-	public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+	public <R> SingleOutputStreamOperator<R> flatSelect(
+			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
-		TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
 			0,
@@ -333,13 +394,11 @@ public class PatternStream<T> {
 	public <R> SingleOutputStreamOperator<R> flatSelect(
 			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction,
 			final TypeInformation<R> outTypeInfo) {
-		return CEPOperatorUtils.createPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternFlatSelectFunction),
-			outTypeInfo,
-			lateDataOutputTag);
+
+		final PatternProcessFunction<T, R> processFunction =
+			fromFlatSelect(clean(patternFlatSelectFunction)).build();
+
+		return process(processFunction, outTypeInfo);
 	}
 
 	/**
@@ -356,7 +415,7 @@ public class PatternStream<T> {
 	 * {@link SingleOutputStreamOperator} resulting from the select operation
 	 * with the same {@link OutputTag}.
 	 *
-	 * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
+	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
 	 * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial
 	 *                               pattern sequence which has timed out.
 	 * @param patternFlatSelectFunction The pattern select function which is called for each detected
@@ -367,11 +426,11 @@ public class PatternStream<T> {
 	 * elements in a side output.
 	 */
 	public <L, R> SingleOutputStreamOperator<R> flatSelect(
-		final OutputTag<L> timeoutOutputTag,
-		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
-		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+			final OutputTag<L> timedOutPartialMatchesTag,
+			final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
+			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
-		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
 			0,
@@ -381,7 +440,10 @@ public class PatternStream<T> {
 			null,
 			false);
 
-		return flatSelect(timeoutOutputTag, patternFlatTimeoutFunction, rightTypeInfo, patternFlatSelectFunction);
+		return flatSelect(timedOutPartialMatchesTag,
+			patternFlatTimeoutFunction,
+			rightTypeInfo,
+			patternFlatSelectFunction);
 	}
 
 	/**
@@ -398,7 +460,7 @@ public class PatternStream<T> {
 	 * {@link SingleOutputStreamOperator} resulting from the select operation
 	 * with the same {@link OutputTag}.
 	 *
-	 * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
+	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
 	 * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial
 	 *                               pattern sequence which has timed out.
 	 * @param patternFlatSelectFunction The pattern select function which is called for each detected
@@ -410,20 +472,17 @@ public class PatternStream<T> {
 	 * elements in a side output.
 	 */
 	public <L, R> SingleOutputStreamOperator<R> flatSelect(
-		final OutputTag<L> timeoutOutputTag,
-		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
-		final TypeInformation<R> outTypeInfo,
-		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+			final OutputTag<L> timedOutPartialMatchesTag,
+			final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
+			final TypeInformation<R> outTypeInfo,
+			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
-		return CEPOperatorUtils.createTimeoutPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternFlatSelectFunction),
-			outTypeInfo,
-			timeoutOutputTag,
-			clean(patternFlatTimeoutFunction),
-			lateDataOutputTag);
+		final PatternProcessFunction<T, R> processFunction =
+			fromFlatSelect(clean(patternFlatSelectFunction))
+				.withTimeoutHandler(timedOutPartialMatchesTag, clean(patternFlatTimeoutFunction))
+				.build();
+
+		return process(processFunction, outTypeInfo);
 	}
 
 	/**
@@ -451,10 +510,10 @@ public class PatternStream<T> {
 	 */
 	@Deprecated
 	public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
-		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
-		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+			final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
+			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
-		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<L> timedOutTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatTimeoutFunction,
 			PatternFlatTimeoutFunction.class,
 			0,
@@ -464,7 +523,7 @@ public class PatternStream<T> {
 			null,
 			false);
 
-		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> mainTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
 			0,
@@ -474,27 +533,25 @@ public class PatternStream<T> {
 			null,
 			false);
 
-		final OutputTag<L> outputTag = new OutputTag<L>(UUID.randomUUID().toString(), leftTypeInfo);
+		final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timedOutTypeInfo);
 
-		final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternFlatSelectFunction),
-			rightTypeInfo,
-			outputTag,
-			clean(patternFlatTimeoutFunction),
-			lateDataOutputTag);
+		final PatternProcessFunction<T, R> processFunction =
+			fromFlatSelect(clean(patternFlatSelectFunction))
+				.withTimeoutHandler(outputTag, clean(patternFlatTimeoutFunction))
+				.build();
 
+		final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
 		final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
+		final TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(timedOutTypeInfo, mainTypeInfo);
 
-		TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
-
-		return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
+		return mainStream
+			.connect(timedOutStream)
+			.map(new CoMapTimeout<>())
+			.returns(outTypeInfo);
 	}
 
-	public PatternStream<T> sideOutputLateData(OutputTag<T> outputTag) {
-		this.lateDataOutputTag = clean(outputTag);
+	public PatternStream<T> sideOutputLateData(OutputTag<T> lateDataOutputTag) {
+		this.lateDataOutputTag = clean(lateDataOutputTag);
 		return this;
 	}
 
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
new file mode 100644
index 0000000..a2c430a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility method for creating {@link PatternStream}.
+ */
+@Internal
+final class PatternStreamBuilder {
+
+	/**
+	 * Creates a data stream containing results of {@link PatternProcessFunction} to fully matching event patterns.
+	 *
+	 * @param inputStream stream of input events
+	 * @param pattern pattern to be search for in the stream
+	 * @param processFunction function to be applied to matching event sequences
+	 * @param outTypeInfo output TypeInformation of
+	 *        {@link PatternProcessFunction#processMatch(Map, PatternProcessFunction.Context, Collector)}
+	 * @param <IN> type of input events
+	 * @param <OUT> type of output events
+	 * @return Data stream containing fully matched event sequence with applied {@link PatternProcessFunction}
+	 */
+	static <IN, OUT, K> SingleOutputStreamOperator<OUT> createPatternStream(
+			final DataStream<IN> inputStream,
+			final Pattern<IN, ?> pattern,
+			final TypeInformation<OUT> outTypeInfo,
+			@Nullable final EventComparator<IN> comparator,
+			@Nullable final OutputTag<IN> lateDataOutputTag,
+			final PatternProcessFunction<IN, OUT> processFunction) {
+
+		checkNotNull(inputStream);
+		checkNotNull(pattern);
+		checkNotNull(outTypeInfo);
+		checkNotNull(processFunction);
+
+		final TypeSerializer<IN> inputSerializer =
+				inputStream.getType().createSerializer(inputStream.getExecutionConfig());
+
+		// check whether we use processing time
+		final boolean isProcessingTime =
+			inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
+
+		// compile our pattern into a NFAFactory to instantiate NFAs later on
+		final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
+
+		CepOperator<IN, K, OUT> operator = new CepOperator<>(
+			inputSerializer,
+			isProcessingTime,
+			nfaFactory,
+			comparator,
+			pattern.getAfterMatchSkipStrategy(),
+			processFunction,
+			lateDataOutputTag);
+
+		final SingleOutputStreamOperator<OUT> patternStream;
+		if (inputStream instanceof KeyedStream) {
+			KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
+
+			patternStream = keyedStream.transform(
+				"CepOperator",
+				outTypeInfo,
+				operator);
+		} else {
+			KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
+
+			patternStream = inputStream.keyBy(keySelector).transform(
+				"GlobalCepOperator",
+				outTypeInfo,
+				operator
+				).forceNonParallel();
+		}
+
+		return patternStream;
+	}
+
+	private PatternStreamBuilder() {
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
new file mode 100644
index 0000000..23367cd
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.context;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+
+/**
+ * Enables access to time related characteristics such as current processing time or timestamp of currently processed
+ * element. Used in {@link PatternProcessFunction} and
+ * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ */
+@PublicEvolving
+public interface TimerContext {
+
+	/**
+	 * Timestamp of the element currently being processed.
+	 *
+	 * <p>This might be {@code null}, for example if the time characteristic of your program
+	 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+	 */
+	Long timestamp();
+
+	/** Returns the current processing time. */
+	long currentProcessingTime();
+
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
new file mode 100644
index 0000000..b392501
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.cep.context.TimerContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * It is called with a map of detected events which are identified by their names.
+ * The names are defined by the {@link org.apache.flink.cep.pattern.Pattern} specifying
+ * the sought-after pattern. This is the preferred way to process found matches.
+ *
+ * <pre>{@code
+ * PatternStream<IN> pattern = ...
+ *
+ * DataStream<OUT> result = pattern.process(new MyPatternProcessFunction());
+ * }</pre>
+ * @param <IN> type of incoming elements
+ * @param <OUT> type of produced elements based on found matches
+ */
+@PublicEvolving
+public abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {
+
+	/**
+	 * Generates resulting elements given a map of detected pattern events. The events
+	 * are identified by their specified names.
+	 *
+	 * <p>{@link PatternProcessFunction.Context#timestamp()} in this case returns the time of the last element that was
+	 * assigned to the match, resulting in this partial match being finished.
+	 *
+	 * @param match map containing the found pattern. Events are identified by their names.
+	 * @param ctx enables access to time features and emitting results through side outputs
+	 * @param out Collector used to output the generated elements
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 *                   operation to fail and may trigger recovery.
+	 */
+	public abstract void processMatch(
+		final Map<String, List<IN>> match,
+		final Context ctx,
+		final Collector<OUT> out) throws Exception;
+
+	/**
+	 * Gives access to time related characteristics as well as enables emitting elements to side outputs.
+	 */
+	public interface Context extends TimerContext {
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 *
+		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+		 * @param value The record to emit.
+		 */
+		<X> void output(final OutputTag<X> outputTag, final X value);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternFlatSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternFlatSelectFunction.java
new file mode 100644
index 0000000..fdc5640
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternFlatSelectFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternFlatSelectFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public abstract class RichPatternFlatSelectFunction<IN, OUT>
+		extends AbstractRichFunction
+		implements PatternFlatSelectFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract void flatSelect(final Map<String, List<IN>> pattern, final Collector<OUT> out) throws Exception;
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternSelectFunction.java
new file mode 100644
index 0000000..a4bf327
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/RichPatternSelectFunction.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions;
+
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.cep.PatternSelectFunction;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Rich variant of the {@link PatternSelectFunction}. As a {@link RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link RichFunction#open(org.apache.flink.configuration.Configuration)} and
+ * {@link RichFunction#close()}.
+ *
+ * @param <IN> Type of the input elements
+ * @param <OUT> Type of the output element
+ */
+public abstract class RichPatternSelectFunction<IN, OUT>
+		extends AbstractRichFunction
+		implements PatternSelectFunction<IN, OUT> {
+
+	private static final long serialVersionUID = 1L;
+
+	public abstract OUT select(final Map<String, List<IN>> pattern) throws Exception;
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
new file mode 100644
index 0000000..2871039
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Enables handling timed out partial matches. It shall be used in a mixin style. If you need your
+ * {@link PatternProcessFunction} to be able to handle timed out partial matches implement this interface as well.
+ * Example:
+ *
+ * <pre>
+ * {@code
+ * private class MyFunction extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
+ *
+ * }
+ * }
+ * </pre>
+ *
+ * @param <IN> type of input elements
+ */
+@PublicEvolving
+public interface TimedOutPartialMatchHandler<IN> {
+
+	/**
+	 * Called for every timed out partial match (due to {@link org.apache.flink.cep.pattern.Pattern#within(Time)}).
+	 * It enables custom handling, e.g. one can emit the timed out results through a side output:
+	 *
+	 * <pre>
+	 * {@code
+	 *
+	 * private final OutputTag<T> timedOutPartialMatchesTag = ...
+	 *
+	 * private class MyFunction extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
+	 *
+	 *     @Override
+	 *     public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception {
+	 *          ...
+	 *     }
+	 *
+	 *     @Override
+	 *     void processTimedOutMatch(Map<String, List<IN>> match, PatternProcessFunction.Context ctx) throws Exception {
+	 *          ctx.output(timedOutPartialMatchesTag, match);
+	 *     }
+	 * }
+	 * }
+	 * </pre>
+	 *
+	 * <p>{@link PatternProcessFunction.Context#timestamp()} in this case returns the minimal time in which we can
+	 * say that the partial match will not become a match, which is effectively the timestamp of the first element
+	 * assigned to the partial match plus the value of within.
+	 *
+	 * @param match map containing the timed out partial match. Events are identified by their names.
+	 * @param ctx enables access to time features and emitting results through side outputs
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 *                   operation to fail and may trigger recovery.
+	 */
+	void processTimedOutMatch(
+		final Map<String, List<IN>> match,
+		final PatternProcessFunction.Context ctx) throws Exception;
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternFlatSelectAdapter.java
new file mode 100644
index 0000000..90c9397
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternFlatSelectAdapter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions.adaptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter that expresses {@link PatternFlatSelectFunction} with {@link PatternProcessFunction}.
+ */
+@Internal
+public class PatternFlatSelectAdapter<IN, OUT> extends PatternProcessFunction<IN, OUT> {
+
+	private final PatternFlatSelectFunction<IN, OUT> flatSelectFunction;
+
+	public PatternFlatSelectAdapter(final PatternFlatSelectFunction<IN, OUT> flatSelectFunction) {
+		this.flatSelectFunction = checkNotNull(flatSelectFunction);
+	}
+
+	@Override
+	public void open(final Configuration parameters) throws Exception {
+		FunctionUtils.setFunctionRuntimeContext(flatSelectFunction, getRuntimeContext());
+		FunctionUtils.openFunction(flatSelectFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(flatSelectFunction);
+	}
+
+	@Override
+	public void processMatch(
+			final Map<String, List<IN>> match,
+			final Context ctx,
+			final Collector<OUT> out) throws Exception {
+		flatSelectFunction.flatSelect(match, out);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternSelectAdapter.java
new file mode 100644
index 0000000..76ccf9e
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternSelectAdapter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions.adaptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter that expresses {@link PatternSelectFunction} with {@link PatternProcessFunction}.
+ */
+@Internal
+public class PatternSelectAdapter<IN, OUT> extends PatternProcessFunction<IN, OUT> {
+
+	private final PatternSelectFunction<IN, OUT> selectFunction;
+
+	public PatternSelectAdapter(final PatternSelectFunction<IN, OUT> selectFunction) {
+		this.selectFunction = checkNotNull(selectFunction);
+	}
+
+	@Override
+	public void open(final Configuration parameters) throws Exception {
+		FunctionUtils.setFunctionRuntimeContext(selectFunction, getRuntimeContext());
+		FunctionUtils.openFunction(selectFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(selectFunction);
+	}
+
+	@Override
+	public void processMatch(
+			final Map<String, List<IN>> match,
+			final Context ctx,
+			final Collector<OUT> out) throws Exception {
+		out.collect(selectFunction.select(match));
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
new file mode 100644
index 0000000..ab9c97d
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions.adaptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.PatternFlatTimeoutFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter that expresses combination of {@link PatternFlatSelectFunction} and {@link PatternTimeoutFlatSelectAdapter}
+ * with {@link PatternProcessFunction}.
+ */
+@Internal
+public class PatternTimeoutFlatSelectAdapter<IN, OUT, T>
+		extends PatternFlatSelectAdapter<IN, OUT>
+		implements TimedOutPartialMatchHandler<IN> {
+
+	private final PatternFlatTimeoutFunction<IN, T> flatTimeoutFunction;
+	private final OutputTag<T> timedOutPartialMatchesTag;
+
+	private transient SideCollector<T> sideCollector;
+
+	public PatternTimeoutFlatSelectAdapter(
+			PatternFlatSelectFunction<IN, OUT> flatSelectFunction,
+			PatternFlatTimeoutFunction<IN, T> flatTimeoutFunction,
+			OutputTag<T> timedOutPartialMatchesTag) {
+		super(flatSelectFunction);
+		this.flatTimeoutFunction = checkNotNull(flatTimeoutFunction);
+		this.timedOutPartialMatchesTag = checkNotNull(timedOutPartialMatchesTag);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		FunctionUtils.setFunctionRuntimeContext(flatTimeoutFunction, getRuntimeContext());
+		FunctionUtils.openFunction(flatTimeoutFunction, parameters);
+
+		if (sideCollector == null) {
+			sideCollector = new SideCollector<>(checkNotNull(timedOutPartialMatchesTag));
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		FunctionUtils.closeFunction(flatTimeoutFunction);
+	}
+
+	@Override
+	public void processTimedOutMatch(
+			Map<String, List<IN>> match,
+			Context ctx) throws Exception {
+		sideCollector.setCtx(ctx);
+		long timestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
+		flatTimeoutFunction.timeout(match, timestamp, sideCollector);
+	}
+
+	/**
+	 * Adapter that emitting timed out results from {@link PatternFlatTimeoutFunction}, which expects {@link Collector}
+	 * through the {@link PatternProcessFunction.Context} of enclosing {@link PatternProcessFunction}.
+	 */
+	private static final class SideCollector<T> implements Collector<T> {
+
+		private final OutputTag<T> timedOutPartialMatchesTag;
+
+		private transient Context ctx;
+
+		private SideCollector(OutputTag<T> timedOutPartialMatchesTag) {
+			this.timedOutPartialMatchesTag = checkNotNull(timedOutPartialMatchesTag);
+		}
+
+		public void setCtx(Context ctx) {
+			this.ctx = ctx;
+		}
+
+		@Override
+		public void collect(T record) {
+			ctx.output(timedOutPartialMatchesTag, record);
+		}
+
+		@Override
+		public void close() {}
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
new file mode 100644
index 0000000..29b0cf7
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions.adaptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.PatternTimeoutFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter that expresses combination of {@link PatternSelectFunction} and {@link PatternTimeoutFlatSelectAdapter}
+ * with {@link PatternProcessFunction}.
+ */
+@Internal
+public class PatternTimeoutSelectAdapter<IN, OUT, T>
+		extends PatternSelectAdapter<IN, OUT>
+		implements TimedOutPartialMatchHandler<IN> {
+
+	private final PatternTimeoutFunction<IN, T> timeoutFunction;
+	private final OutputTag<T> timedOutPartialMatchesTag;
+
+	public PatternTimeoutSelectAdapter(
+			final PatternSelectFunction<IN, OUT> selectFunction,
+			final PatternTimeoutFunction<IN, T> timeoutFunction,
+			final OutputTag<T> timedOutPartialMatchesTag) {
+		super(selectFunction);
+		this.timeoutFunction = checkNotNull(timeoutFunction);
+		this.timedOutPartialMatchesTag = checkNotNull(timedOutPartialMatchesTag);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		FunctionUtils.setFunctionRuntimeContext(timeoutFunction, getRuntimeContext());
+		FunctionUtils.openFunction(timeoutFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		FunctionUtils.closeFunction(timeoutFunction);
+	}
+
+	@Override
+	public void processTimedOutMatch(
+			final Map<String, List<IN>> match,
+			final Context ctx) throws Exception {
+
+		final long resultTimestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
+		final T timedOutPatternResult = timeoutFunction.timeout(match, resultTimestamp);
+
+		ctx.output(timedOutPartialMatchesTag, timedOutPatternResult);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 190727f..b1ddb85 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -35,7 +35,6 @@ import org.apache.flink.cep.nfa.sharedbuffer.EventId;
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
-import org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
@@ -61,7 +60,7 @@ import static org.apache.flink.cep.nfa.MigrationUtils.deserializeComputationStat
 /**
  * Non-deterministic finite automaton implementation.
  *
- * <p>The {@link AbstractKeyedCEPPatternOperator CEP operator}
+ * <p>The {@link org.apache.flink.cep.operator.CepOperator CEP operator}
  * keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones.
  * When an event gets processed, it updates the NFA's internal state machine.
  *
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
deleted file mode 100644
index 3aca758..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.NullByteKeySelector;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.cep.PatternFlatTimeoutFunction;
-import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.PatternStream;
-import org.apache.flink.cep.PatternTimeoutFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.util.OutputTag;
-
-/**
- * Utility methods for creating {@link PatternStream}.
- */
-public class CEPOperatorUtils {
-
-	/**
-	 * Creates a data stream containing results of {@link PatternSelectFunction} to fully matching event patterns.
-	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
-	 * @param selectFunction function to be applied to matching event sequences
-	 * @param outTypeInfo output TypeInformation of selectFunction
-	 * @param <IN> type of input events
-	 * @param <OUT> type of output events
-	 * @return Data stream containing fully matched event sequence with applied {@link PatternSelectFunction}
-	 */
-	public static <IN, OUT> SingleOutputStreamOperator<OUT> createPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final EventComparator<IN> comparator,
-			final PatternSelectFunction<IN, OUT> selectFunction,
-			final TypeInformation<OUT> outTypeInfo,
-			final OutputTag<IN> lateDataOutputTag) {
-		return createPatternStream(inputStream, pattern, outTypeInfo, false, comparator, new OperatorBuilder<IN, OUT>() {
-			@Override
-			public OneInputStreamOperator<IN, OUT> build(
-				TypeSerializer<IN> inputSerializer,
-				boolean isProcessingTime,
-				NFACompiler.NFAFactory<IN> nfaFactory,
-				EventComparator<IN> comparator,
-				AfterMatchSkipStrategy skipStrategy) {
-				return new SelectCepOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					skipStrategy,
-					selectFunction,
-					lateDataOutputTag
-				);
-			}
-
-			@Override
-			public String getKeyedOperatorName() {
-				return "SelectCepOperator";
-			}
-
-			@Override
-			public String getOperatorName() {
-				return "GlobalSelectCepOperator";
-			}
-		});
-	}
-
-	/**
-	 * Creates a data stream containing results of {@link PatternFlatSelectFunction} to fully matching event patterns.
-	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
-	 * @param selectFunction function to be applied to matching event sequences
-	 * @param outTypeInfo output TypeInformation of selectFunction
-	 * @param <IN> type of input events
-	 * @param <OUT> type of output events
-	 * @return Data stream containing fully matched event sequence with applied {@link PatternFlatSelectFunction}
-	 */
-	public static <IN, OUT> SingleOutputStreamOperator<OUT> createPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final EventComparator<IN> comparator,
-			final PatternFlatSelectFunction<IN, OUT> selectFunction,
-			final TypeInformation<OUT> outTypeInfo,
-			final OutputTag<IN> lateDataOutputTag) {
-		return createPatternStream(inputStream, pattern, outTypeInfo, false, comparator, new OperatorBuilder<IN, OUT>() {
-			@Override
-			public OneInputStreamOperator<IN, OUT> build(
-				TypeSerializer<IN> inputSerializer,
-				boolean isProcessingTime,
-				NFACompiler.NFAFactory<IN> nfaFactory,
-				EventComparator<IN> comparator,
-				AfterMatchSkipStrategy skipStrategy) {
-				return new FlatSelectCepOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					skipStrategy,
-					selectFunction,
-					lateDataOutputTag
-				);
-			}
-
-			@Override
-			public String getKeyedOperatorName() {
-				return "FlatSelectCepOperator";
-			}
-
-			@Override
-			public String getOperatorName() {
-				return "GlobalFlatSelectCepOperator";
-			}
-		});
-	}
-
-	/**
-	 * Creates a data stream containing results of {@link PatternFlatSelectFunction} to fully matching event patterns and
-	 * also timed out partially matched with applied {@link PatternFlatTimeoutFunction} as a sideoutput.
-	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
-	 * @param selectFunction function to be applied to matching event sequences
-	 * @param outTypeInfo output TypeInformation of selectFunction
-	 * @param outputTag {@link OutputTag} for a side-output with timed out matches
-	 * @param timeoutFunction function to be applied to timed out event sequences
-	 * @param <IN> type of input events
-	 * @param <OUT1> type of fully matched events
-	 * @param <OUT2> type of timed out events
-	 * @return Data stream containing fully matched event sequence with applied {@link PatternFlatSelectFunction} that
-	 * contains timed out patterns with applied {@link PatternFlatTimeoutFunction} as side-output
-	 */
-	public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final EventComparator<IN> comparator,
-			final PatternFlatSelectFunction<IN, OUT1> selectFunction,
-			final TypeInformation<OUT1> outTypeInfo,
-			final OutputTag<OUT2> outputTag,
-			final PatternFlatTimeoutFunction<IN, OUT2> timeoutFunction,
-			final OutputTag<IN> lateDataOutputTag) {
-		return createPatternStream(inputStream, pattern, outTypeInfo, true, comparator, new OperatorBuilder<IN, OUT1>() {
-			@Override
-			public OneInputStreamOperator<IN, OUT1> build(
-				TypeSerializer<IN> inputSerializer,
-				boolean isProcessingTime,
-				NFACompiler.NFAFactory<IN> nfaFactory,
-				EventComparator<IN> comparator,
-				AfterMatchSkipStrategy skipStrategy) {
-				return new FlatSelectTimeoutCepOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					skipStrategy,
-					selectFunction,
-					timeoutFunction,
-					outputTag,
-					lateDataOutputTag
-				);
-			}
-
-			@Override
-			public String getKeyedOperatorName() {
-				return "FlatSelectTimeoutCepOperator";
-			}
-
-			@Override
-			public String getOperatorName() {
-				return "GlobalFlatSelectTimeoutCepOperator";
-			}
-		});
-	}
-
-	/**
-	 * Creates a data stream containing results of {@link PatternSelectFunction} to fully matching event patterns and
-	 * also timed out partially matched with applied {@link PatternTimeoutFunction} as a sideoutput.
-	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
-	 * @param selectFunction function to be applied to matching event sequences
-	 * @param outTypeInfo output TypeInformation of selectFunction
-	 * @param outputTag {@link OutputTag} for a side-output with timed out matches
-	 * @param timeoutFunction function to be applied to timed out event sequences
-	 * @param <IN> type of input events
-	 * @param <OUT1> type of fully matched events
-	 * @param <OUT2> type of timed out events
-	 * @return Data stream containing fully matched event sequence with applied {@link PatternSelectFunction} that
-	 * contains timed out patterns with applied {@link PatternTimeoutFunction} as side-output
-	 */
-	public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final EventComparator<IN> comparator,
-			final PatternSelectFunction<IN, OUT1> selectFunction,
-			final TypeInformation<OUT1> outTypeInfo,
-			final OutputTag<OUT2> outputTag,
-			final PatternTimeoutFunction<IN, OUT2> timeoutFunction,
-			final OutputTag<IN> lateDataOutputTag) {
-		return createPatternStream(inputStream, pattern, outTypeInfo, true, comparator, new OperatorBuilder<IN, OUT1>() {
-			@Override
-			public OneInputStreamOperator<IN, OUT1> build(
-				TypeSerializer<IN> inputSerializer,
-				boolean isProcessingTime,
-				NFACompiler.NFAFactory<IN> nfaFactory,
-				EventComparator<IN> comparator,
-				AfterMatchSkipStrategy skipStrategy) {
-				return new SelectTimeoutCepOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					skipStrategy,
-					selectFunction,
-					timeoutFunction,
-					outputTag,
-					lateDataOutputTag
-				);
-			}
-
-			@Override
-			public String getKeyedOperatorName() {
-				return "SelectTimeoutCepOperator";
-			}
-
-			@Override
-			public String getOperatorName() {
-				return "GlobalSelectTimeoutCepOperator";
-			}
-		});
-	}
-
-	private static <IN, OUT, K> SingleOutputStreamOperator<OUT> createPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final TypeInformation<OUT> outTypeInfo,
-			final boolean timeoutHandling,
-			final EventComparator<IN> comparator,
-			final OperatorBuilder<IN, OUT> operatorBuilder) {
-		final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
-
-		// check whether we use processing time
-		final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		// compile our pattern into a NFAFactory to instantiate NFAs later on
-		final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
-
-		final SingleOutputStreamOperator<OUT> patternStream;
-
-		if (inputStream instanceof KeyedStream) {
-			KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
-
-			patternStream = keyedStream.transform(
-				operatorBuilder.getKeyedOperatorName(),
-				outTypeInfo,
-				operatorBuilder.build(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					pattern.getAfterMatchSkipStrategy()));
-		} else {
-			KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
-
-			patternStream = inputStream.keyBy(keySelector).transform(
-				operatorBuilder.getOperatorName(),
-				outTypeInfo,
-				operatorBuilder.build(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					pattern.getAfterMatchSkipStrategy()
-				)).forceNonParallel();
-		}
-
-		return patternStream;
-	}
-
-	private interface OperatorBuilder<IN, OUT> {
-			OneInputStreamOperator<IN, OUT> build(
-			TypeSerializer<IN> inputSerializer,
-			boolean isProcessingTime,
-			NFACompiler.NFAFactory<IN> nfaFactory,
-			EventComparator<IN> comparator,
-			AfterMatchSkipStrategy skipStrategy);
-
-		String getKeyedOperatorName();
-
-		String getOperatorName();
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
similarity index 78%
rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
rename to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index c603741..0fd7053 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.cep.operator;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -30,6 +30,8 @@ import org.apache.flink.api.common.typeutils.base.ListSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.NFA.MigratedNFA;
 import org.apache.flink.cep.nfa.NFAState;
@@ -50,11 +52,14 @@ import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -64,18 +69,17 @@ import java.util.PriorityQueue;
 import java.util.stream.Stream;
 
 /**
- * Abstract CEP pattern operator for a keyed input stream. For each key, the operator creates
+ * CEP pattern operator for a keyed input stream. For each key, the operator creates
  * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are
- * stored using the managed keyed state. Additionally, the set of all seen keys is kept as part of the
- * operator state. This is necessary to trigger the execution for all keys upon receiving a new
- * watermark.
+ * stored using the managed keyed state.
  *
  * @param <IN> Type of the input elements
  * @param <KEY> Type of the key on which the input stream is keyed
  * @param <OUT> Type of the output elements
  */
-public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function>
-		extends AbstractUdfStreamOperator<OUT, F>
+@Internal
+public class CepOperator<IN, KEY, OUT>
+		extends AbstractUdfStreamOperator<OUT, PatternProcessFunction<IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace> {
 
 	private static final long serialVersionUID = -4166778210774160757L;
@@ -105,29 +109,38 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 	 */
 	private long lastWatermark;
 
+	/** Comparator for secondary sorting. Primary sorting is always done on time. */
 	private final EventComparator<IN> comparator;
 
 	/**
 	 * {@link OutputTag} to use for late arriving events. Elements with timestamp smaller than
 	 * the current watermark will be emitted to this.
 	 */
-	protected final OutputTag<IN> lateDataOutputTag;
+	private final OutputTag<IN> lateDataOutputTag;
+
+	/** Strategy which element to skip after a match was found. */
+	private final AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+	/** Context passed to user function. */
+	private transient ContextFunctionImpl context;
 
-	protected final AfterMatchSkipStrategy afterMatchSkipStrategy;
+	/** Main output collector, that sets a proper timestamp to the StreamRecord. */
+	private transient TimestampedCollector<OUT> collector;
 
-	public AbstractKeyedCEPPatternOperator(
+	public CepOperator(
 			final TypeSerializer<IN> inputSerializer,
 			final boolean isProcessingTime,
 			final NFACompiler.NFAFactory<IN> nfaFactory,
-			final EventComparator<IN> comparator,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy,
-			final F function,
-			final OutputTag<IN> lateDataOutputTag) {
+			@Nullable final EventComparator<IN> comparator,
+			@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
+			final PatternProcessFunction<IN, OUT> function,
+			@Nullable final OutputTag<IN> lateDataOutputTag) {
 		super(function);
 
 		this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
-		this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
 		this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+
+		this.isProcessingTime = isProcessingTime;
 		this.comparator = comparator;
 		this.lateDataOutputTag = lateDataOutputTag;
 
@@ -183,15 +196,16 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 	@Override
 	public void open() throws Exception {
 		super.open();
-
 		timerService = getInternalTimerService(
 				"watermark-callbacks",
 				VoidNamespaceSerializer.INSTANCE,
 				this);
 
 		this.nfa = nfaFactory.createNFA();
-
 		openNFA(nfa);
+
+		context = new ContextFunctionImpl();
+		collector = new TimestampedCollector<>(output);
 	}
 
 	@Override
@@ -404,8 +418,83 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 	private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
 		try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
 			Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
-				nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
-			processTimedOutSequences(timedOut, timestamp);
+					nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
+			if (!timedOut.isEmpty()) {
+				processTimedOutSequences(timedOut);
+			}
+		}
+	}
+
+	private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
+		PatternProcessFunction<IN, OUT> function = getUserFunction();
+		setTimestamp(timestamp);
+		for (Map<String, List<IN>> matchingSequence : matchingSequences) {
+			function.processMatch(matchingSequence, context, collector);
+		}
+	}
+
+	private void processTimedOutSequences(Collection<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences) throws Exception {
+		PatternProcessFunction<IN, OUT> function = getUserFunction();
+		if (function instanceof TimedOutPartialMatchHandler) {
+
+			@SuppressWarnings("unchecked")
+			TimedOutPartialMatchHandler<IN> timeoutHandler = (TimedOutPartialMatchHandler<IN>) function;
+
+			for (Tuple2<Map<String, List<IN>>, Long> matchingSequence : timedOutSequences) {
+				setTimestamp(matchingSequence.f1);
+				timeoutHandler.processTimedOutMatch(matchingSequence.f0, context);
+			}
+		}
+	}
+
+	private void setTimestamp(long timestamp) {
+		if (!isProcessingTime) {
+			collector.setAbsoluteTimestamp(timestamp);
+			context.setTimestamp(timestamp);
+		}
+	}
+
+	/**
+	 * Implementation of {@link PatternProcessFunction.Context}. Design to be instantiated once per operator.
+	 * It serves three methods:
+	 *  <ul>
+	 *      <li>gives access to currentProcessingTime through {@link InternalTimerService}</li>
+	 *      <li>gives access to timestamp of current record (or null if Processing time)</li>
+	 *      <li>enables side outputs with proper timestamp of StreamRecord handling based on either Processing or
+	 *          Event time</li>
+	 *  </ul>
+	 */
+	private class ContextFunctionImpl implements PatternProcessFunction.Context {
+
+		private Long timestamp;
+
+		@Override
+		public <X> void output(final OutputTag<X> outputTag, final X value) {
+			final StreamRecord<X> record;
+			if (isProcessingTime) {
+				record = new StreamRecord<>(value);
+			} else {
+				record = new StreamRecord<>(value, timestamp());
+			}
+			output.collect(outputTag, record);
+		}
+
+		void setTimestamp(long timestamp) {
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public Long timestamp() {
+			if (isProcessingTime) {
+				return null;
+			} else {
+				return timestamp;
+			}
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return timerService.currentProcessingTime();
 		}
 	}
 
@@ -429,29 +518,22 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Fu
 		}
 	}
 
-	protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception;
-
-	protected void processTimedOutSequences(
-			Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences,
-			long timestamp) throws Exception {
-	}
-
 	//////////////////////			Testing Methods			//////////////////////
 
 	@VisibleForTesting
-	public boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
+	boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
 		setCurrentKey(key);
 		return !partialMatches.isEmpty();
 	}
 
 	@VisibleForTesting
-	public boolean hasNonEmptyPQ(KEY key) throws Exception {
+	boolean hasNonEmptyPQ(KEY key) throws Exception {
 		setCurrentKey(key);
 		return elementQueueState.keys().iterator().hasNext();
 	}
 
 	@VisibleForTesting
-	public int getPQSize(KEY key) throws Exception {
+	int getPQSize(KEY key) throws Exception {
 		setCurrentKey(key);
 		int counter = 0;
 		for (List<IN> elements: elementQueueState.values()) {
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
deleted file mode 100644
index df54b53..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.util.OutputTag;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternFlatSelectFunction} to fully matched event patterns.
- *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
- * @param <OUT> Type of the output elements
- */
-public class FlatSelectCepOperator<IN, KEY, OUT>
-	extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT, PatternFlatSelectFunction<IN, OUT>> {
-	private static final long serialVersionUID = 5845993459551561518L;
-
-	public FlatSelectCepOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory,
-		EventComparator<IN> comparator,
-		AfterMatchSkipStrategy skipStrategy,
-		PatternFlatSelectFunction<IN, OUT> function,
-		OutputTag<IN> lateDataOutputTag) {
-		super(inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, function, lateDataOutputTag);
-	}
-
-	private transient TimestampedCollector<OUT> collector;
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-	}
-
-	@Override
-	protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
-		for (Map<String, List<IN>> match : matchingSequences) {
-			collector.setAbsoluteTimestamp(timestamp);
-			getUserFunction().flatSelect(match, collector);
-		}
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
deleted file mode 100644
index 642c92a..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.cep.PatternFlatTimeoutFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.util.OutputTag;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternFlatSelectFunction} to fully
- * matched event patterns and {@link PatternFlatTimeoutFunction} to timed out ones. The timed out elements are returned
- * as a side-output.
- *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
- * @param <OUT1> Type of the output elements
- * @param <OUT2> Type of the timed out output elements
- */
-public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends
-	AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, FlatSelectTimeoutCepOperator.FlatSelectWrapper<IN, OUT1, OUT2>> {
-
-	private transient TimestampedCollector<OUT1> collector;
-
-	private transient TimestampedSideOutputCollector<OUT2> sideOutputCollector;
-
-	private OutputTag<OUT2> timedOutOutputTag;
-
-	public FlatSelectTimeoutCepOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory,
-		EventComparator<IN> comparator,
-		AfterMatchSkipStrategy skipStrategy,
-		PatternFlatSelectFunction<IN, OUT1> flatSelectFunction,
-		PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction,
-		OutputTag<OUT2> outputTag,
-		OutputTag<IN> lateDataOutputTag) {
-		super(
-			inputSerializer,
-			isProcessingTime,
-			nfaFactory,
-			comparator,
-			skipStrategy,
-			new FlatSelectWrapper<>(flatSelectFunction, flatTimeoutFunction),
-			lateDataOutputTag);
-		this.timedOutOutputTag = outputTag;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-		sideOutputCollector = new TimestampedSideOutputCollector<>(timedOutOutputTag, output);
-	}
-
-	@Override
-	protected void processMatchedSequences(
-		Iterable<Map<String, List<IN>>> matchingSequences,
-		long timestamp) throws Exception {
-		for (Map<String, List<IN>> match : matchingSequences) {
-			getUserFunction().getFlatSelectFunction().flatSelect(match, collector);
-		}
-	}
-
-	@Override
-	protected void processTimedOutSequences(
-		Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception {
-		for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) {
-			sideOutputCollector.setAbsoluteTimestamp(timestamp);
-			getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1, sideOutputCollector);
-		}
-	}
-
-	/**
-	 * Wrapper that enables storing {@link PatternFlatSelectFunction} and {@link PatternFlatTimeoutFunction} functions
-	 * in one udf.
-	 */
-	@Internal
-	public static class FlatSelectWrapper<IN, OUT1, OUT2> implements Function {
-
-		private static final long serialVersionUID = -8320546120157150202L;
-
-		private PatternFlatSelectFunction<IN, OUT1> flatSelectFunction;
-		private PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction;
-
-		@VisibleForTesting
-		public PatternFlatSelectFunction<IN, OUT1> getFlatSelectFunction() {
-			return flatSelectFunction;
-		}
-
-		@VisibleForTesting
-		public PatternFlatTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() {
-			return flatTimeoutFunction;
-		}
-
-		public FlatSelectWrapper(
-			PatternFlatSelectFunction<IN, OUT1> flatSelectFunction,
-			PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction) {
-			this.flatSelectFunction = flatSelectFunction;
-			this.flatTimeoutFunction = flatTimeoutFunction;
-		}
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
deleted file mode 100644
index ad335e5..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternSelectFunction} to fully matched event patterns.
- *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
- * @param <OUT> Type of the output elements
- */
-public class SelectCepOperator<IN, KEY, OUT>
-	extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT, PatternSelectFunction<IN, OUT>> {
-	public SelectCepOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory,
-		EventComparator<IN> comparator,
-		AfterMatchSkipStrategy skipStrategy,
-		PatternSelectFunction<IN, OUT> function,
-		OutputTag<IN> lateDataOutputTag) {
-		super(inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, function, lateDataOutputTag);
-	}
-
-	@Override
-	protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
-		for (Map<String, List<IN>> match : matchingSequences) {
-			output.collect(new StreamRecord<>(getUserFunction().select(match), timestamp));
-		}
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
deleted file mode 100644
index 73ac709..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.PatternTimeoutFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternSelectFunction} to fully
- * matched event patterns and {@link PatternTimeoutFunction} to timed out ones. The timed out elements are returned
- * as a side-output.
- *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
- * @param <OUT1> Type of the output elements
- * @param <OUT2> Type of the timed out output elements
- */
-public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY>
-	extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, SelectTimeoutCepOperator.SelectWrapper<IN, OUT1, OUT2>> {
-
-	private OutputTag<OUT2> timedOutOutputTag;
-
-	public SelectTimeoutCepOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory,
-		final EventComparator<IN> comparator,
-		AfterMatchSkipStrategy skipStrategy,
-		PatternSelectFunction<IN, OUT1> flatSelectFunction,
-		PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction,
-		OutputTag<OUT2> outputTag,
-		OutputTag<IN> lateDataOutputTag) {
-		super(
-			inputSerializer,
-			isProcessingTime,
-			nfaFactory,
-			comparator,
-			skipStrategy,
-			new SelectWrapper<>(flatSelectFunction, flatTimeoutFunction),
-			lateDataOutputTag);
-		this.timedOutOutputTag = outputTag;
-	}
-
-	@Override
-	protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
-		for (Map<String, List<IN>> match : matchingSequences) {
-			output.collect(new StreamRecord<>(getUserFunction().getFlatSelectFunction().select(match), timestamp));
-		}
-	}
-
-	@Override
-	protected void processTimedOutSequences(
-		Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception {
-		for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) {
-			output.collect(timedOutOutputTag,
-				new StreamRecord<>(
-					getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1),
-					timestamp));
-		}
-	}
-
-	/**
-	 * Wrapper that enables storing {@link PatternSelectFunction} and {@link PatternTimeoutFunction} in one udf.
-	 *
-	 * @param <IN> Type of the input elements
-	 * @param <OUT1> Type of the output elements
-	 * @param <OUT2> Type of the timed out output elements
-	 */
-	@Internal
-	public static class SelectWrapper<IN, OUT1, OUT2> implements Function {
-
-		private static final long serialVersionUID = -8320546120157150202L;
-
-		private PatternSelectFunction<IN, OUT1> flatSelectFunction;
-		private PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction;
-
-		PatternSelectFunction<IN, OUT1> getFlatSelectFunction() {
-			return flatSelectFunction;
-		}
-
-		PatternTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() {
-			return flatTimeoutFunction;
-		}
-
-		public SelectWrapper(
-			PatternSelectFunction<IN, OUT1> flatSelectFunction,
-			PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction) {
-			this.flatSelectFunction = flatSelectFunction;
-			this.flatTimeoutFunction = flatTimeoutFunction;
-		}
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
deleted file mode 100644
index 5336543..0000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-/**
- * Wrapper around an {@link Output} for user functions that expect a {@link Collector}.
- * Before giving the {@link TimestampedSideOutputCollector} to a user function you must set
- * the timestamp that should be attached to emitted elements. Most operators
- * would set the timestamp of the incoming
- * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
- *
- * <p>This version emits results into a SideOutput specified by given {@link OutputTag}
- *
- * @param <T> The type of the elements that can be emitted.
- */
-@Internal
-public class TimestampedSideOutputCollector<T> implements Collector<T> {
-
-	private final Output<?> output;
-
-	private final StreamRecord<T> reuse;
-
-	private final OutputTag<T> outputTag;
-
-	/**
-	 * Creates a new {@link TimestampedSideOutputCollector} that wraps the given {@link Output} and collects
-	 * results into sideoutput corresponding to {@link OutputTag}.
-	 */
-	public TimestampedSideOutputCollector(OutputTag<T> outputTag, Output<?> output) {
-		this.output = output;
-		this.outputTag = outputTag;
-		this.reuse = new StreamRecord<T>(null);
-	}
-
-	@Override
-	public void collect(T record) {
-		output.collect(outputTag, reuse.replace(record));
-	}
-
-	public void setTimestamp(StreamRecord<?> timestampBase) {
-		if (timestampBase.hasTimestamp()) {
-			reuse.setTimestamp(timestampBase.getTimestamp());
-		} else {
-			reuse.eraseTimestamp();
-		}
-	}
-
-	public void setAbsoluteTimestamp(long timestamp) {
-		reuse.setTimestamp(timestamp);
-	}
-
-	public void eraseTimestamp() {
-		reuse.eraseTimestamp();
-	}
-
-	@Override
-	public void close() {
-		output.close();
-	}
-}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 3f249a8..2e4b41d 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -26,9 +26,9 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.PatternTimeoutFunction;
 import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
@@ -43,6 +43,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 
@@ -69,7 +70,7 @@ import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.validateMockitoUsage;
 
 /**
- * Tests for {@link AbstractKeyedCEPPatternOperator}.
+ * Tests for {@link CepOperator}.
  */
 public class CEPOperatorTest extends TestLogger {
 
@@ -254,32 +255,14 @@ public class CEPOperatorTest extends TestLogger {
 			new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timedOut") {};
 		final KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
 			new KeyedOneInputStreamOperatorTestHarness<>(
-				new SelectTimeoutCepOperator<>(
+				new CepOperator<>(
 					Event.createTypeSerializer(),
 					false,
 					new NFAFactory(true),
 					null,
 					null,
-					new PatternSelectFunction<Event, Map<String, List<Event>>>() {
-						private static final long serialVersionUID = -5768297287711394420L;
-
-						@Override
-						public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception {
-							return pattern;
-						}
-					},
-					new PatternTimeoutFunction<Event, Tuple2<Map<String, List<Event>>, Long>>() {
-						private static final long serialVersionUID = 2843329425823093249L;
-
-						@Override
-						public Tuple2<Map<String, List<Event>>, Long> timeout(
-							Map<String, List<Event>> pattern,
-							long timeoutTimestamp) throws Exception {
-							return Tuple2.of(pattern, timeoutTimestamp);
-						}
-					},
-					timedOut
-				, null), new KeySelector<Event, Integer>() {
+					new TimedOutProcessFunction(timedOut),
+					null), new KeySelector<Event, Integer>() {
 				private static final long serialVersionUID = 7219185117566268366L;
 
 				@Override
@@ -334,7 +317,7 @@ public class CEPOperatorTest extends TestLogger {
 	@Test
 	public void testKeyedCEPOperatorNFAUpdate() throws Exception {
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			true,
 			new SimpleNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
@@ -393,7 +376,7 @@ public class CEPOperatorTest extends TestLogger {
 		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			true,
 			new SimpleNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
@@ -455,7 +438,7 @@ public class CEPOperatorTest extends TestLogger {
 
 	@Test
 	public void testKeyedCEPOperatorNFAUpdateTimes() throws Exception {
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			true,
 			new SimpleNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
@@ -497,7 +480,7 @@ public class CEPOperatorTest extends TestLogger {
 		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			true,
 			new SimpleNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
@@ -548,7 +531,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(false);
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(false);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
 		try {
@@ -593,7 +576,7 @@ public class CEPOperatorTest extends TestLogger {
 			OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 			harness.close();
 
-			SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(false);
+			CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(false);
 			harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
 			harness.setup();
 			harness.initializeState(snapshot);
@@ -646,7 +629,7 @@ public class CEPOperatorTest extends TestLogger {
 		Event middle1Event3 = new Event(41, "a", 4.0);
 		Event middle2Event1 = new Event(41, "b", 5.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			false,
 			new ComplexNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
@@ -738,7 +721,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		OutputTag<Event> lateDataTag = new OutputTag<Event>("late-data", TypeInformation.of(Event.class));
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			false,
 			new ComplexNFAFactory(),
 			null,
@@ -784,7 +767,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(true);
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(true);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
 		try {
@@ -812,7 +795,7 @@ public class CEPOperatorTest extends TestLogger {
 			OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 			harness.close();
 
-			SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(true);
+			CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(true);
 			harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
 			harness.setup();
 			harness.initializeState(snapshot);
@@ -904,7 +887,7 @@ public class CEPOperatorTest extends TestLogger {
 			}
 		});
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			false,
 			new NFACompiler.NFAFactory<Event>() {
 				private static final long serialVersionUID = 477082663248051994L;
@@ -975,7 +958,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(true);
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(true);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
 		try {
@@ -1003,7 +986,7 @@ public class CEPOperatorTest extends TestLogger {
 			OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 			harness.close();
 
-			SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(true);
+			CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(true);
 			harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
 			harness.setup();
 			harness.initializeState(snapshot);
@@ -1032,7 +1015,7 @@ public class CEPOperatorTest extends TestLogger {
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false);
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
 		try {
@@ -1064,7 +1047,7 @@ public class CEPOperatorTest extends TestLogger {
 			OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 			harness.close();
 
-			SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(false);
+			CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(false);
 			harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
 			harness.setup();
 			harness.initializeState(snapshot);
@@ -1101,12 +1084,12 @@ public class CEPOperatorTest extends TestLogger {
 		assertEquals(end, patternMap.get("end").get(0));
 	}
 
-	private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperator(
+	private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperator(
 		boolean isProcessingTime) {
 		return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new NFAFactory());
 	}
 
-	private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperatorWithComparator(
+	private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperatorWithComparator(
 		boolean isProcessingTime) {
 
 		return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new NFAFactory(), new org.apache.flink.cep.EventComparator<Event>() {
@@ -1176,7 +1159,7 @@ public class CEPOperatorTest extends TestLogger {
 		return CepOperatorTestUtilities.getCepTestHarness(getKeyedCepOpearator(isProcessingTime));
 	}
 
-	private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator(boolean isProcessingTime) {
+	private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator(boolean isProcessingTime) {
 		return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new CEPOperatorTest.NFAFactory());
 	}
 
@@ -1323,4 +1306,29 @@ public class CEPOperatorTest extends TestLogger {
 			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
+
+	private static class TimedOutProcessFunction extends PatternProcessFunction<Event, Map<String, List<Event>>>
+		implements TimedOutPartialMatchHandler<Event> {
+
+		private final OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOutTag;
+
+		private TimedOutProcessFunction(OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOutTag) {
+			this.timedOutTag = timedOutTag;
+		}
+
+		@Override
+		public void processMatch(
+			Map<String, List<Event>> match,
+			PatternProcessFunction.Context ctx,
+			Collector<Map<String, List<Event>>> out) throws Exception {
+			out.collect(match);
+		}
+
+		@Override
+		public void processTimedOutMatch(
+			Map<String, List<Event>> match,
+			PatternProcessFunction.Context ctx) throws Exception {
+			ctx.output(timedOutTag, Tuple2.of(match, ctx.timestamp()));
+		}
+	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
index abc4b18..2b4bba1 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
@@ -22,17 +22,18 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
 import java.util.List;
 import java.util.Map;
 
 /**
- * Utility methods for creating test {@link AbstractKeyedCEPPatternOperator}.
+ * Utility methods for creating test {@link CepOperator}.
  */
 public class CepOperatorTestUtilities {
 
@@ -46,8 +47,8 @@ public class CepOperatorTestUtilities {
 		}
 	}
 
-	public static OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> cepOperator) throws Exception {
+	public static <T> OneInputStreamOperatorTestHarness<Event, T> getCepTestHarness(
+		CepOperator<Event, Integer, T> cepOperator) throws Exception {
 		KeySelector<Event, Integer> keySelector = new TestKeySelector();
 
 		return new KeyedOneInputStreamOperatorTestHarness<>(
@@ -56,14 +57,14 @@ public class CepOperatorTestUtilities {
 			BasicTypeInfo.INT_TYPE_INFO);
 	}
 
-	public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
+	public static <K> CepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
 		boolean isProcessingTime,
 		NFACompiler.NFAFactory<Event> nfaFactory) {
 
 		return getKeyedCepOpearator(isProcessingTime, nfaFactory, null);
 	}
 
-	public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
+	public static <K> CepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
 			boolean isProcessingTime,
 			NFACompiler.NFAFactory<Event> nfaFactory,
 			EventComparator<Event> comparator) {
@@ -71,26 +72,30 @@ public class CepOperatorTestUtilities {
 		return getKeyedCepOpearator(isProcessingTime, nfaFactory, comparator, null);
 	}
 
-	public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
+	public static <K> CepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
 			boolean isProcessingTime,
 			NFACompiler.NFAFactory<Event> nfaFactory,
 			EventComparator<Event> comparator,
 			OutputTag<Event> outputTag) {
 
-		return new SelectCepOperator<>(
+		return new CepOperator<>(
 			Event.createTypeSerializer(),
 			isProcessingTime,
 			nfaFactory,
 			comparator,
 			null,
-			new PatternSelectFunction<Event, Map<String, List<Event>>>() {
+			new PatternProcessFunction<Event, Map<String, List<Event>>>() {
 				private static final long serialVersionUID = -7143807777582726991L;
 
 				@Override
-				public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception {
-					return pattern;
+				public void processMatch(
+						Map<String, List<Event>> match,
+						Context ctx,
+						Collector<Map<String, List<Event>>> out) throws Exception {
+					out.collect(match);
 				}
-			}, outputTag);
+			},
+			outputTag);
 	}
 
 	private CepOperatorTestUtilities() {
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
new file mode 100644
index 0000000..f35d08e
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.cep.operator.CepOperatorTestUtilities.getCepTestHarness;
+import static org.apache.flink.cep.utils.EventBuilder.event;
+import static org.apache.flink.cep.utils.OutputAsserter.assertOutput;
+
+/**
+ * Tests for {@link CepOperator} which check proper setting {@link PatternProcessFunction.Context}.
+ */
+public class CepProcessFunctionContextTest extends TestLogger {
+
+	@Test
+	public void testTimestampPassingInEventTime() throws Exception {
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractTimestampAndNames(1),
+					new NFAForwardingFactory(),
+					false))) {
+			harness.open();
+
+			// events out of order to test if internal sorting does not mess up the timestamps
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+
+			harness.processWatermark(6);
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("3:B")
+				.nextElementEquals("5:A")
+				.watermarkEquals(6L)
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testTimestampPassingInProcessingTime() throws Exception {
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractTimestampAndNames(1),
+					new NFAForwardingFactory(),
+					true))) {
+			harness.open();
+
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("(NO_TIMESTAMP):A")
+				.nextElementEquals("(NO_TIMESTAMP):B")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testCurrentProcessingTimeInProcessingTime() throws Exception {
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractCurrentProcessingTimeAndNames(1),
+					new NFAForwardingFactory(),
+					true))) {
+			harness.open();
+
+			harness.setProcessingTime(15);
+			harness.processElement(event().withName("A").asStreamRecord());
+			harness.setProcessingTime(35);
+			harness.processElement(event().withName("B").asStreamRecord());
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("15:A")
+				.nextElementEquals("35:B")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testCurrentProcessingTimeInEventTime() throws Exception {
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractCurrentProcessingTimeAndNames(1),
+					new NFAForwardingFactory(),
+					false))) {
+			harness.open();
+
+			harness.setProcessingTime(10);
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.setProcessingTime(100);
+			harness.processWatermark(6);
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("100:A")
+				.watermarkEquals(6)
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testTimestampPassingForTimedOutInEventTime() throws Exception {
+
+		OutputTag<String> timedOut = new OutputTag<String>("timedOut") {};
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractTimestampAndNames(2, timedOut),
+					new NFATimingOutFactory(),
+					false))) {
+			harness.open();
+
+			// events out of order to test if internal sorting does not mess up the timestamps
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.processElement(event().withName("C").withTimestamp(20).asStreamRecord());
+			harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+
+			harness.processWatermark(22);
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("5:B:A")
+				.watermarkEquals(22)
+				.hasNoMoreElements();
+
+			assertOutput(harness.getSideOutput(timedOut))
+				.nextElementEquals("15:A")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testTimestampPassingForTimedOutInProcessingTime() throws Exception {
+
+		OutputTag<String> timedOut = new OutputTag<String>("timedOut") {};
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractTimestampAndNames(2, timedOut),
+					new NFATimingOutFactory(),
+					true))) {
+			harness.open();
+
+			harness.setProcessingTime(3);
+			harness.processElement(event().withName("A").withTimestamp(3).asStreamRecord());
+			harness.setProcessingTime(5);
+			harness.processElement(event().withName("C").withTimestamp(5).asStreamRecord());
+			harness.setProcessingTime(20);
+			harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("(NO_TIMESTAMP):A:C")
+				.hasNoMoreElements();
+
+			assertOutput(harness.getSideOutput(timedOut))
+				.nextElementEquals("(NO_TIMESTAMP):C")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testCurrentProcessingTimeForTimedOutInEventTime() throws Exception {
+
+		OutputTag<String> sideOutputTag = new OutputTag<String>("timedOut") {};
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
+					new NFATimingOutFactory(),
+					false))) {
+			harness.open();
+
+			// events out of order to test if internal sorting does not mess up the timestamps
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
+			harness.processElement(event().withName("C").withTimestamp(3).asStreamRecord());
+
+			harness.setProcessingTime(100);
+			harness.processWatermark(22);
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("100:C:A")
+				.watermarkEquals(22)
+				.hasNoMoreElements();
+
+			assertOutput(harness.getSideOutput(sideOutputTag))
+				.nextElementEquals("100:A")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testCurrentProcessingTimeForTimedOutInProcessingTime() throws Exception {
+
+		OutputTag<String> sideOutputTag = new OutputTag<String>("timedOut") {};
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
+					new NFATimingOutFactory(),
+					true))) {
+			harness.open();
+
+			harness.setProcessingTime(3);
+			harness.processElement(event().withName("A").asStreamRecord());
+			harness.setProcessingTime(5);
+			harness.processElement(event().withName("B").asStreamRecord());
+			harness.setProcessingTime(20);
+			harness.processElement(event().withName("C").asStreamRecord());
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("5:A:B")
+				.hasNoMoreElements();
+
+			// right now we time out only on next event in processing time, therefore the 20
+			assertOutput(harness.getSideOutput(sideOutputTag))
+				.nextElementEquals("20:B")
+				.hasNoMoreElements();
+		}
+	}
+
+	/* TEST UTILS */
+
+	private <T> CepOperator<Event, Integer, T> createCepOperator(
+			PatternProcessFunction<Event, T> processFunction,
+			NFACompiler.NFAFactory<Event> nfaFactory,
+			boolean isProcessingTime) throws Exception {
+		return new CepOperator<>(
+			Event.createTypeSerializer(),
+			isProcessingTime,
+			nfaFactory,
+			null,
+			null,
+			processFunction,
+			null);
+	}
+
+	/**
+	 * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows:
+	 * <pre>[timestamp]:[Event.getName]...</pre> The Event.getName will occur stateNumber times. If the match does not
+	 * contain n-th pattern it will replace this position with "null".
+	 *
+	 * @param stateNumber number of states in the pattern
+	 * @return created PatternProcessFunction
+	 */
+	private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber) {
+		return new AccessContextWithNames(stateNumber,
+			context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+	}
+
+	/**
+	 * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows:
+	 * <pre>[timestamp]:[Event.getName]...</pre> The Event.getName will occur stateNumber times. If the match does not
+	 * contain n-th pattern it will replace this position with "null".
+	 *
+	 * <p>This function will also apply the same logic for timed out partial matches and emit those results into
+	 * side output described with given output tag.
+	 *
+	 * @param stateNumber number of states in the pattern
+	 * @param timedOutTag output tag where to emit timed out partial matches
+	 * @return created PatternProcessFunction
+	 */
+	private static PatternProcessFunction<Event, String> extractTimestampAndNames(
+			int stateNumber,
+			OutputTag<String> timedOutTag) {
+		return new AccessContextWithNamesWithTimedOut(stateNumber,
+			timedOutTag,
+			context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+	}
+
+	/**
+	 * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows:
+	 * <pre>[currentProcessingTime]:[Event.getName]...</pre> The Event.getName will occur stateNumber times.
+	 * If the match does not contain n-th pattern it will replace this position with "null".
+	 *
+	 * @param stateNumber number of states in the pattern
+	 * @return created PatternProcessFunction
+	 */
+	private static PatternProcessFunction<Event, String> extractCurrentProcessingTimeAndNames(int stateNumber) {
+		return new AccessContextWithNames(stateNumber, context -> String.valueOf(context.currentProcessingTime()));
+	}
+
+	/**
+	 * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows:
+	 * <pre>[currentProcessingTime]:[Event.getName]...</pre> The Event.getName will occur stateNumber times.
+	 * If the match does not contain n-th pattern it will replace this position with "null".
+	 *
+	 * <p>This function will also apply the same logic for timed out partial matches and emit those results into
+	 * side output described with given output tag.
+	 *
+	 * @param stateNumber number of states in the pattern
+	 * @param timedOutTag output tag where to emit timed out partial matches
+	 * @return created PatternProcessFunction
+	 */
+	private static PatternProcessFunction<Event, String> extractCurrentProcessingTimeAndNames(
+			int stateNumber,
+			OutputTag<String> timedOutTag) {
+		return new AccessContextWithNamesWithTimedOut(stateNumber,
+			timedOutTag,
+			context -> String.valueOf(context.currentProcessingTime()));
+	}
+
+	private static final String NO_TIMESTAMP = "(NO_TIMESTAMP)";
+
+	static class AccessContextWithNames extends PatternProcessFunction<Event, String> {
+
+		private final int stateCount;
+		private final Function<Context, String> contextAccessor;
+
+		AccessContextWithNames(int stateCount, Function<Context, String> contextAccessor) {
+			this.stateCount = stateCount;
+			this.contextAccessor = contextAccessor;
+		}
+
+		@Override
+		public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception {
+			out.collect(extractResult(match, ctx));
+		}
+
+		String extractResult(Map<String, List<Event>> match, Context ctx) {
+			StringBuilder stringBuilder = new StringBuilder(contextAccessor.apply(ctx));
+			for (int i = 1; i <= stateCount; i++) {
+				List<Event> events = match.get("" + i);
+				if (events != null) {
+					stringBuilder.append(":").append(events.get(0).getName());
+				}
+			}
+			return stringBuilder.toString();
+		}
+	}
+
+	static final class AccessContextWithNamesWithTimedOut extends AccessContextWithNames
+		implements TimedOutPartialMatchHandler<Event> {
+
+		private OutputTag<String> outputTag;
+
+		AccessContextWithNamesWithTimedOut(
+				int stateCount,
+				OutputTag<String> outputTag,
+				Function<Context, String> contextAccessor) {
+			super(stateCount, contextAccessor);
+			this.outputTag = outputTag;
+		}
+
+		@Override
+		public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception {
+			ctx.output(outputTag, extractResult(match, ctx));
+		}
+	}
+
+	/**
+	 * This NFA consists of one state accepting any element.
+	 */
+	private static class NFAForwardingFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.begin("1");
+
+			return NFACompiler.compileFactory(pattern, false).createNFA();
+		}
+	}
+
+	/**
+	 * This NFA consists of two states accepting any element. It times out after 10 milliseconds
+	 */
+	private static class NFATimingOutFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("1").next("2").within(Time.milliseconds(10));
+
+			return NFACompiler.compileFactory(pattern, true).createNFA();
+		}
+	}
+
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/CepOperatorBuilder.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/CepOperatorBuilder.java
new file mode 100644
index 0000000..ef05446
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/CepOperatorBuilder.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test builder for cep operator that accepts {@link Event} as input elements.
+ *
+ * @param <OUT> type of output elements
+ */
+public class CepOperatorBuilder<OUT> {
+
+	private final boolean isProcessingTime;
+	private final NFACompiler.NFAFactory<Event> nfaFactory;
+	private final EventComparator<Event> comparator;
+	private final AfterMatchSkipStrategy skipStrategy;
+	private final PatternProcessFunction<Event, OUT> function;
+	private final OutputTag<Event> lateDataOutputTag;
+
+	public static CepOperatorBuilder<Map<String, List<Event>>> createOperatorForNFA(NFA<Event> nfa) {
+		return new CepOperatorBuilder<>(
+			true,
+			new NFACompiler.NFAFactory<Event>() {
+				@Override
+				public NFA<Event> createNFA() {
+					return nfa;
+				}
+			},
+			null,
+			null,
+			new PatternProcessFunction<Event, Map<String, List<Event>>>() {
+				private static final long serialVersionUID = -7143807777582726991L;
+
+				@Override
+				public void processMatch(
+					Map<String, List<Event>> match,
+					Context ctx,
+					Collector<Map<String, List<Event>>> out) throws Exception {
+					out.collect(match);
+				}
+			},
+			null);
+	}
+
+	public static CepOperatorBuilder<Map<String, List<Event>>> createOperatorForNFAFactory(NFACompiler.NFAFactory<Event> nfaFactory) {
+		return new CepOperatorBuilder<>(
+			true,
+			nfaFactory,
+			null,
+			null,
+			new PatternProcessFunction<Event, Map<String, List<Event>>>() {
+				private static final long serialVersionUID = -7143807777582726991L;
+
+				@Override
+				public void processMatch(
+					Map<String, List<Event>> match,
+					Context ctx,
+					Collector<Map<String, List<Event>>> out) throws Exception {
+					out.collect(match);
+				}
+			},
+			null);
+	}
+
+	private CepOperatorBuilder(
+		boolean isProcessingTime,
+		NFACompiler.NFAFactory<Event> nfaFactory,
+		EventComparator<Event> comparator,
+		AfterMatchSkipStrategy skipStrategy,
+		PatternProcessFunction<Event, OUT> processFunction,
+		OutputTag<Event> lateDataOutputTag) {
+		this.isProcessingTime = isProcessingTime;
+		this.nfaFactory = nfaFactory;
+		this.comparator = comparator;
+		this.skipStrategy = skipStrategy;
+		function = processFunction;
+		this.lateDataOutputTag = lateDataOutputTag;
+	}
+
+	public CepOperatorBuilder<OUT> inProcessingTime() {
+		return new CepOperatorBuilder<>(
+			true,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> inEventTime() {
+		return new CepOperatorBuilder<>(
+			false,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> withComparator(EventComparator<Event> comparator) {
+		return new CepOperatorBuilder<>(
+			false,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> withSkipStrategy(AfterMatchSkipStrategy skipStrategy) {
+		return new CepOperatorBuilder<>(
+			false,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> withLateDataOutputTag(OutputTag<Event> lateDataOutputTag) {
+		return new CepOperatorBuilder<>(
+			false,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> withNFA(NFA<Event> nfa) {
+		return new CepOperatorBuilder<>(
+			false,
+			new NFACompiler.NFAFactory<Event>() {
+				@Override
+				public NFA<Event> createNFA() {
+					return nfa;
+				}
+			},
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public <T> CepOperatorBuilder<T> withFunction(PatternProcessFunction<Event, T> processFunction) {
+		return new CepOperatorBuilder<>(
+			isProcessingTime,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			processFunction,
+			lateDataOutputTag);
+	}
+
+	public <K> CepOperator<Event, K, OUT> build() {
+		return new CepOperator<>(Event.createTypeSerializer(),
+			isProcessingTime,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java
index 63cfa15..7457331 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java
@@ -29,7 +29,7 @@ import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
- * Asserter for output from {@link OneInputStreamOperatorTestHarness}
+ * Asserter for output from {@link OneInputStreamOperatorTestHarness}.
  */
 public class OutputAsserter {