You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2019/01/02 07:13:21 UTC

[GitHub] leesf closed pull request #7360: [FLINK-11217] Back to top button is missing in the Joining document a…

leesf closed pull request #7360: [FLINK-11217] Back to top button is missing in the Joining document a…
URL: https://github.com/apache/flink/pull/7360
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_v1_dispatcher.html b/docs/_includes/generated/rest_v1_dispatcher.html
index 4977e5ccfeb..2fd3dcfe40c 100644
--- a/docs/_includes/generated/rest_v1_dispatcher.html
+++ b/docs/_includes/generated/rest_v1_dispatcher.html
@@ -3871,3 +3871,5 @@
     </tr>
   </tbody>
 </table>
+
+{% top %}
diff --git a/docs/dev/connectors/index.md b/docs/dev/connectors/index.md
index 28ab2d34635..b5405d4d06b 100644
--- a/docs/dev/connectors/index.md
+++ b/docs/dev/connectors/index.md
@@ -82,3 +82,5 @@ If the data involved has many fewer reads than writes, a better approach can be
 for an external application to pull from Flink the data it needs.
 The [Queryable State]({{ site.baseurl }}/dev/stream/state/queryable_state.html) interface
 enables this by allowing the state being managed by Flink to be queried on demand.
+
+{% top %}
diff --git a/docs/dev/execution_configuration.md b/docs/dev/execution_configuration.md
index f0103b0f39f..fc2364bdfa0 100644
--- a/docs/dev/execution_configuration.md
+++ b/docs/dev/execution_configuration.md
@@ -51,7 +51,7 @@ With the closure cleaner disabled, it might happen that an anonymous user functi
 
 - `getNumberOfExecutionRetries()` / `setNumberOfExecutionRetries(int numberOfExecutionRetries)` Sets the number of times that failed tasks are re-executed. A value of zero effectively disables fault tolerance. A value of `-1` indicates that the system default value (as defined in the configuration) should be used. This is deprecated, use [restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) instead.
 
-- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully been stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more. This is deprecated, use [restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) instead.
+- `getExecutionRetryDelay()` / `setExecutionRetryDelay(long executionRetryDelay)` Sets the delay in milliseconds that the system waits after a job has failed, before re-executing it. The delay starts after all tasks have been successfully stopped on the TaskManagers, and once the delay is past, the tasks are re-started. This parameter is useful to delay re-execution in order to let certain time-out related failures surface fully (like broken connections that have not fully timed out), before attempting a re-execution and immediately failing again due to the same problem. This parameter only has an effect if the number of execution re-tries is one or more. This is deprecated, use [restart strategies]({{ site.baseurl }}/dev/restart_strategies.html) instead.
 
 - `getExecutionMode()` / `setExecutionMode()`. The default execution mode is PIPELINED. Sets the execution mode to execute the program. The execution mode defines whether data exchanges are performed in a batch or on a pipelined manner.
 
diff --git a/docs/dev/java_lambdas.md b/docs/dev/java_lambdas.md
index 0b4a5cc069d..bc9704b517d 100644
--- a/docs/dev/java_lambdas.md
+++ b/docs/dev/java_lambdas.md
@@ -135,4 +135,6 @@ public static class DoubleTuple extends Tuple2<Integer, Integer> {
         this.f1 = f1;
     }
 }
-{% endhighlight %}
\ No newline at end of file
+{% endhighlight %}
+
+{% top %}
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ce9a9c3d72b..0519dbc3662 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -101,14 +101,16 @@ Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(
 
 PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
-DataStream<Alert> result = patternStream.select(
-    new PatternSelectFunction<Event, Alert>() {
+DataStream<Alert> result = patternStream.process(
+    new PatternProcessFunction<Event, Alert>() {
         @Override
-        public Alert select(Map<String, List<Event>> pattern) throws Exception {
-            return createAlertFrom(pattern);
+        public void processMatch(
+                Map<String, List<Event>> pattern,
+                Context ctx,
+                Collector<Alert> out) throws Exception {
+            out.collect(createAlertFrom(pattern));
         }
-    }
-});
+    });
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
@@ -121,7 +123,15 @@ val pattern = Pattern.begin[Event]("start").where(_.getId == 42)
 
 val patternStream = CEP.pattern(input, pattern)
 
-val result: DataStream[Alert] = patternStream.select(createAlert(_))
+val result: DataStream[Alert] = patternStream.process(
+    new PatternProcessFunction[Event, Alert]() {
+        override def processMatch(
+              `match`: util.Map[String, util.List[Event]],
+              ctx: PatternProcessFunction.Context,
+              out: Collector[Alert]): Unit = {
+            out.collect(createAlertFrom(pattern))
+        }
+    })
 {% endhighlight %}
 </div>
 </div>
@@ -1477,92 +1487,61 @@ The input stream can be *keyed* or *non-keyed* depending on your use-case.
 
 ### Selecting from Patterns
 
-Once you have obtained a `PatternStream` you can select from detected event sequences via the `select` or `flatSelect` methods.
+Once you have obtained a `PatternStream` you can apply transformation to detected event sequences. The suggested way of doing that
+is by `PatternProcessFunction`.
 
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-The `select()` method requires a `PatternSelectFunction` implementation.
-A `PatternSelectFunction` has a `select` method which is called for each matching event sequence.
+A `PatternProcessFunction` has a `processMatch` method which is called for each matching event sequence.
 It receives a match in the form of `Map<String, List<IN>>` where the key is the name of each pattern in your pattern
 sequence and the value is a list of all accepted events for that pattern (`IN` is the type of your input elements).
 The events for a given pattern are ordered by timestamp. The reason for returning a list of accepted events for each
-pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern. The selection function returns exactly one result.
+pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern.
 
 {% highlight java %}
-class MyPatternSelectFunction<IN, OUT> implements PatternSelectFunction<IN, OUT> {
+class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
     @Override
-    public OUT select(Map<String, List<IN>> pattern) {
-        IN startEvent = pattern.get("start").get(0);
-        IN endEvent = pattern.get("end").get(0);
-        return new OUT(startEvent, endEvent);
+    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
+        IN startEvent = match.get("start").get(0);
+        IN endEvent = match.get("end").get(0);
+        out.collect(OUT(startEvent, endEvent));
     }
 }
 {% endhighlight %}
 
-A `PatternFlatSelectFunction` is similar to the `PatternSelectFunction`, with the only distinction that it can return an
-arbitrary number of results. To do this, the `select` method has an additional `Collector` parameter which is
-used to forward your output elements downstream.
+The `PatternProcessFunction` gives access to a `Context` object. Thanks to it, one can access time related
+characteristics such as `currentProcessingTime` or `timestamp` of current match (which is the timestamp of the last element assigned to the match).
+Through this context one can also emit results to a [side-output]({{ site.baseurl }}/dev/stream/side_output.html).
 
-{% highlight java %}
-class MyPatternFlatSelectFunction<IN, OUT> implements PatternFlatSelectFunction<IN, OUT> {
-    @Override
-    public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> collector) {
-        IN startEvent = pattern.get("start").get(0);
-        IN endEvent = pattern.get("end").get(0);
 
-        for (int i = 0; i < startEvent.getValue(); i++ ) {
-            collector.collect(new OUT(startEvent, endEvent));
-        }
-    }
-}
-{% endhighlight %}
-</div>
-
-<div data-lang="scala" markdown="1">
-The `select()` method takes a selection function as argument, which is called for each matching event sequence.
-It receives a match in the form of `Map[String, Iterable[IN]]` where the key is the name of each pattern in your pattern
-sequence and the value is an Iterable over all accepted events for that pattern (`IN` is the type of your input elements).
+#### Handling Timed Out Partial Patterns
 
-The events for a given pattern are ordered by timestamp. The reason for returning an iterable of accepted events for each pattern is that when using looping patterns (e.g. `oneToMany()` and `times()`), more than one event may be accepted for a given pattern. The selection function returns exactly one result per call.
-
-{% highlight scala %}
-def selectFn(pattern : Map[String, Iterable[IN]]): OUT = {
-    val startEvent = pattern.get("start").get.head
-    val endEvent = pattern.get("end").get.head
-    OUT(startEvent, endEvent)
-}
-{% endhighlight %}
+Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
+are discarded because they exceed the window length. To act upon a timed out partial match one can use `TimedOutPartialMatchHandler` interface.
+The interface is supposed to be used in a mixin style. This mean you can additionally implement this interface with your `PatternProcessFunction`.
+The `TimedOutPartialMatchHandler` provides the additional `processTimedOutMatch` method which will be called for every timed out partial match.
 
-The `flatSelect` method is similar to the `select` method. Their only difference is that the function passed to the
-`flatSelect` method can return an arbitrary number of results per call. In order to do this, the function for
-`flatSelect` has an additional `Collector` parameter which is used to forward your output elements downstream.
+{% highlight java %}
+class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
+    @Override
+    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
+        ...
+    }
 
-{% highlight scala %}
-def flatSelectFn(pattern : Map[String, Iterable[IN]], collector : Collector[OUT]) = {
-    val startEvent = pattern.get("start").get.head
-    val endEvent = pattern.get("end").get.head
-    for (i <- 0 to startEvent.getValue) {
-        collector.collect(OUT(startEvent, endEvent))
+    @Override
+    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
+        IN startEvent = match.get("start").get(0);
+        ctx.output(outputTag, T(startEvent));
     }
 }
 {% endhighlight %}
-</div>
-</div>
 
-### Handling Timed Out Partial Patterns
+<span class="label label-info">Note</span> The `processTimedOutMatch` does not give one access to the main output. You can still emit results
+through [side-outputs]({{ site.baseurl }}/dev/stream/side_output.html) though, through the `Context` object.
 
-Whenever a pattern has a window length attached via the `within` keyword, it is possible that partial event sequences
-are discarded because they exceed the window length. To react to these timed out partial matches the `select`
-and `flatSelect` API calls allow you to specify a timeout handler. This timeout handler is called for each timed out
-partial event sequence. The timeout handler receives all the events that have been matched so far by the pattern, and
-the timestamp when the timeout was detected.
 
-To treat partial patterns, the `select` and `flatSelect` API calls offer an overloaded version which takes as
-parameters
+#### Convenience API
 
- * `PatternTimeoutFunction`/`PatternFlatTimeoutFunction`
- * [OutputTag]({{ site.baseurl }}/dev/stream/side_output.html) for the side output in which the timed out matches will be returned
- * and the known `PatternSelectFunction`/`PatternFlatSelectFunction`.
+The aforementioned `PatternProcessFunction` was introduced in Flink 1.8 and since then it is the recommended way to interact with matches.
+One can still use the old style API like `select`/`flatSelect`, which internally will be translated into a `PatternProcessFunction`.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -1572,18 +1551,21 @@ PatternStream<Event> patternStream = CEP.pattern(input, pattern);
 
 OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
 
-SingleOutputStreamOperator<ComplexEvent> result = patternStream.select(
-    outputTag,
-    new PatternTimeoutFunction<Event, TimeoutEvent>() {...},
-    new PatternSelectFunction<Event, ComplexEvent>() {...}
-);
-
-DataStream<TimeoutEvent> timeoutResult = result.getSideOutput(outputTag);
-
 SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
     outputTag,
-    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {...},
-    new PatternFlatSelectFunction<Event, ComplexEvent>() {...}
+    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {
+        public void timeout(
+                Map<String, List<Event>> pattern,
+                long timeoutTimestamp,
+                Collector<TimeoutEvent> out) throws Exception {
+            out.collect(new TimeoutEvent());
+        }
+    },
+    new PatternFlatSelectFunction<Event, ComplexEvent>() {
+        public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
+            out.collect(new ComplexEvent());
+        }
+    }
 );
 
 DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);
@@ -1594,23 +1576,7 @@ DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag)
 <div data-lang="scala" markdown="1">
 
 {% highlight scala %}
-val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
-
-val outputTag = OutputTag[String]("side-output")
-
-val result: SingleOutputStreamOperator[ComplexEvent] = patternStream.select(outputTag){
-    (pattern: Map[String, Iterable[Event]], timestamp: Long) => TimeoutEvent()
-} {
-    pattern: Map[String, Iterable[Event]] => ComplexEvent()
-}
 
-val timeoutResult: DataStream<TimeoutEvent> = result.getSideOutput(outputTag)
-{% endhighlight %}
-
-The `flatSelect` API call offers the same overloaded version which takes as the first parameter a timeout function and as second parameter a selection function.
-In contrast to the `select` functions, the `flatSelect` functions are called with a `Collector`. You can use the collector to emit an arbitrary number of events.
-
-{% highlight scala %}
 val patternStream: PatternStream[Event] = CEP.pattern(input, pattern)
 
 val outputTag = OutputTag[String]("side-output")
diff --git a/docs/dev/libs/ml/index.md b/docs/dev/libs/ml/index.md
index fbe3dce3dc6..f8a75a45e50 100644
--- a/docs/dev/libs/ml/index.md
+++ b/docs/dev/libs/ml/index.md
@@ -146,3 +146,5 @@ If one wants to chain a `Predictor` to a `Transformer` or a set of chained `Tran
 The Flink community welcomes all contributors who want to get involved in the development of Flink and its libraries.
 In order to get quickly started with contributing to FlinkML, please read our official
 [contribution guide]({{site.baseurl}}/dev/libs/ml/contribution_guide.html).
+
+{% top %}
diff --git a/docs/dev/stream/experimental.md b/docs/dev/stream/experimental.md
index db029befb36..f0a78a53912 100644
--- a/docs/dev/stream/experimental.md
+++ b/docs/dev/stream/experimental.md
@@ -77,3 +77,5 @@ Code example:
       .addSink(new DiscardingSink[Int])
     env.execute()
 {% endhighlight %}
+
+{% top %}
diff --git a/docs/dev/stream/operators/joining.md b/docs/dev/stream/operators/joining.md
index c415f63f781..bdd8e427280 100644
--- a/docs/dev/stream/operators/joining.md
+++ b/docs/dev/stream/operators/joining.md
@@ -280,3 +280,5 @@ orangeStream
 
 </div>
 </div>
+
+{% top %}
diff --git a/docs/dev/stream/operators/process_function.md b/docs/dev/stream/operators/process_function.md
index b2a373eaa5f..3f60d6aae37 100644
--- a/docs/dev/stream/operators/process_function.md
+++ b/docs/dev/stream/operators/process_function.md
@@ -235,7 +235,6 @@ class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String
 </div>
 </div>
 
-{% top %}
 
 
 **NOTE:** Before Flink 1.4.0, when called from a processing-time timer, the `ProcessFunction.onTimer()` method sets
@@ -372,3 +371,5 @@ ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
 </div>
 
 <span class="label label-info">Note</span> Stopping a timer has no effect if no such timer with the given timestamp is registered.
+
+{% top %}
diff --git a/docs/dev/stream/state/broadcast_state.md b/docs/dev/stream/state/broadcast_state.md
index 628a6308b57..73f4dcf7fe4 100644
--- a/docs/dev/stream/state/broadcast_state.md
+++ b/docs/dev/stream/state/broadcast_state.md
@@ -275,3 +275,5 @@ manner.
 
   - **No RocksDB state backend:** Broadcast state is kept in-memory at runtime and memory provisioning should be done 
 accordingly. This holds for all operator states.
+
+{% top %}
diff --git a/docs/dev/stream/state/schema_evolution.md b/docs/dev/stream/state/schema_evolution.md
index f2f52b22ad7..fd0a73c3e1d 100644
--- a/docs/dev/stream/state/schema_evolution.md
+++ b/docs/dev/stream/state/schema_evolution.md
@@ -91,3 +91,5 @@ Flink fully supports evolving schema of Avro type state, as long as the schema c
 
 One limitation is that Avro generated classes used as the state type cannot be relocated or have different
 namespaces when the job is restored.
+
+{% top %}
diff --git a/docs/dev/table/common.md b/docs/dev/table/common.md
index 6c7f92b8d95..5d2521f4546 100644
--- a/docs/dev/table/common.md
+++ b/docs/dev/table/common.md
@@ -800,7 +800,7 @@ The mapping of a data type to a table schema can happen in two ways: **based on
 
 **Position-based Mapping**
 
-Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types *with a defined field order* as well as atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section).
+Position-based mapping can be used to give fields a more meaningful name while keeping the field order. This mapping is available for composite data types *with a defined field order* as well as atomic types. Composite data types such as tuples, rows, and case classes have such a field order. However, fields of a POJO must be mapped based on the field names (see next section). Fields can be projected out but can't be renamed using an alias `as`.
 
 When defining a position-based mapping, the specified names must not exist in the input data type, otherwise the API will assume that the mapping should happen based on the field names. If no field names are specified, the default field names and field order of the composite type are used or `f0` for atomic types. 
 
@@ -815,6 +815,9 @@ DataStream<Tuple2<Long, Integer>> stream = ...
 // convert DataStream into Table with default field names "f0" and "f1"
 Table table = tableEnv.fromDataStream(stream);
 
+// convert DataStream into Table with field "myLong" only
+Table table = tableEnv.fromDataStream(stream, "myLong");
+
 // convert DataStream into Table with field names "myLong" and "myInt"
 Table table = tableEnv.fromDataStream(stream, "myLong, myInt");
 {% endhighlight %}
@@ -830,8 +833,11 @@ val stream: DataStream[(Long, Int)] = ...
 // convert DataStream into Table with default field names "_1" and "_2"
 val table: Table = tableEnv.fromDataStream(stream)
 
+// convert DataStream into Table with field "myLong" only
+val table: Table = tableEnv.fromDataStream(stream, 'myLong)
+
 // convert DataStream into Table with field names "myLong" and "myInt"
-val table: Table = tableEnv.fromDataStream(stream, 'myLong 'myInt)
+val table: Table = tableEnv.fromDataStream(stream, 'myLong, 'myInt)
 {% endhighlight %}
 </div>
 </div>
diff --git a/docs/dev/table/streaming/index.md b/docs/dev/table/streaming/index.md
index 95c6bab8e0a..a6fa525a3db 100644
--- a/docs/dev/table/streaming/index.md
+++ b/docs/dev/table/streaming/index.md
@@ -40,3 +40,5 @@ Where to go next?
 * [Joins in Continuous Queries]({{ site.baseurl }}/dev/table/streaming/joins.html): Different supported types of Joins in Continuous Queries.
 * [Temporal Tables]({{ site.baseurl }}/dev/table/streaming/temporal_tables.html): Describes the Temporal Table concept.
 * [Query configuration]({{ site.baseurl }}/dev/table/streaming/query_configuration.html): Lists Table API & SQL specific configuration options.
+
+{% top %}
diff --git a/docs/dev/table/streaming/joins.md b/docs/dev/table/streaming/joins.md
index 508e8c70221..a572c1be293 100644
--- a/docs/dev/table/streaming/joins.md
+++ b/docs/dev/table/streaming/joins.md
@@ -221,3 +221,5 @@ applied updates according to the primary key until this point in time.
 By definition of event time, [watermarks]({{ site.baseurl }}/dev/event_time.html) allow the join operation to move
 forward in time and discard versions of the build table that are no longer necessary because no incoming row with
 lower or equal timestamp is expected.
+
+{% top %}
diff --git a/docs/dev/table/streaming/match_recognize.md b/docs/dev/table/streaming/match_recognize.md
index 33afc27e75a..4090440f864 100644
--- a/docs/dev/table/streaming/match_recognize.md
+++ b/docs/dev/table/streaming/match_recognize.md
@@ -960,3 +960,5 @@ Unsupported features include:
 * `MATCH_RECOGNIZE` is supported only for SQL. There is no equivalent in the Table API.
 * Aggregations:
   * distinct aggregations are not supported.
+  
+{% top %}
diff --git a/docs/dev/table/streaming/temporal_tables.md b/docs/dev/table/streaming/temporal_tables.md
index 4ebb4a6e8a7..d511c1f93f4 100644
--- a/docs/dev/table/streaming/temporal_tables.md
+++ b/docs/dev/table/streaming/temporal_tables.md
@@ -186,3 +186,5 @@ which allows us to use the function `rates` in the [Table API](../tableApi.html#
 
 Line `(2)` registers this function under the name `Rates` in our table environment,
 which allows us to use the `Rates` function in [SQL](../sql.html#joins).
+
+{% top %}
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index f2f6b7bcedd..44b8d431c8d 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -466,7 +466,7 @@ abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
     */
   def getValue(accumulator: ACC): T // MANDATORY
 
-  h/**
+  /**
     * Resets the accumulator for this [[AggregateFunction]]. This function must be implemented for
     * dataset grouping aggregate.
     *
diff --git a/docs/examples/index.md b/docs/examples/index.md
index a8472026b7c..f7242241167 100644
--- a/docs/examples/index.md
+++ b/docs/examples/index.md
@@ -45,4 +45,6 @@ There are also a few blog posts published online that discuss example applicatio
 
 * [Building real-time dashboard applications with Apache Flink, Elasticsearch, and Kibana](https://www.elastic.co/blog/building-real-time-dashboard-applications-with-apache-flink-elasticsearch-and-kibana) is a blog post at elastic.co showing how to build a real-time dashboard solution for streaming data analytics using Apache Flink, Elasticsearch, and Kibana.
 
-* The [Flink training website](http://training.data-artisans.com/) from data Artisans has a number of examples. Check out the hands-on sections and the exercises.
\ No newline at end of file
+* The [Flink training website](http://training.data-artisans.com/) from data Artisans has a number of examples. Check out the hands-on sections and the exercises.
+
+{% top %}
diff --git a/docs/monitoring/debugging_event_time.md b/docs/monitoring/debugging_event_time.md
index 10a3fb20fdb..8ac72ebe663 100644
--- a/docs/monitoring/debugging_event_time.md
+++ b/docs/monitoring/debugging_event_time.md
@@ -33,13 +33,13 @@ is tracked within the system.
 
 Low watermarks of each task can be accessed through Flink web interface or [metrics system]({{ site.baseurl }}/monitoring/metrics.html).
 
-Each Task in Flink exposes a metric called `currentLowWatermark` that represents the lowest watermark received
+Each Task in Flink exposes a metric called `currentInputWatermark` that represents the lowest watermark received
 by this task. This long value represents the "current event time".
 The value is calculated by taking the minimum of all watermarks received by upstream operators. This means that 
 the event time tracked with watermarks is always dominated by the furthest-behind source.
 
 The low watermark metric is accessible **using the web interface**, by choosing a task in the metric tab,
-and selecting the `<taskNr>.currentLowWatermark` metric. In the new box you'll now be able to see 
+and selecting the `<taskNr>.currentInputWatermark` metric. In the new box you'll now be able to see 
 the current low watermark of the task.
 
 Another way of getting the metric is using one of the **metric reporters**, as described in the documentation
diff --git a/docs/ops/deployment/hadoop.md b/docs/ops/deployment/hadoop.md
index f2f060c076f..b07f9259e68 100644
--- a/docs/ops/deployment/hadoop.md
+++ b/docs/ops/deployment/hadoop.md
@@ -45,3 +45,5 @@ export HADOOP_CLASSPATH=`hadoop classpath`
 {% endhighlight %}
 
 in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath.
+
+{% top %}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 3b3132b3d11..16b2adfd196 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -554,7 +554,7 @@ public void enableGenericTypes() {
 	 * is used, Flink will throw an {@code UnsupportedOperationException} whenever it encounters
 	 * a data type that would go through Kryo for serialization.
 	 *
-	 * <p>Disabling generic types can be helpful to eagerly find and eliminate teh use of types
+	 * <p>Disabling generic types can be helpful to eagerly find and eliminate the use of types
 	 * that would go through Kryo serialization during runtime. Rather than checking types
 	 * individually, using this option will throw exceptions eagerly in the places where generic
 	 * types are used.
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/JobID.java b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
index 01abd766b89..016e2320dc8 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/JobID.java
@@ -26,7 +26,7 @@
 
 /**
  * Unique (at least statistically unique) identifier for a Flink Job. Jobs in Flink correspond
- * do dataflow graphs.
+ * to dataflow graphs.
  * 
  * <p>Jobs act simultaneously as <i>sessions</i>, because jobs can be created and submitted
  * incrementally in different parts. Newer fragments of a graph can be attached to existing
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
index ec2b542aa76..08b33334653 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ListSerializer.java
@@ -32,7 +32,7 @@
 
 /**
  * A serializer for {@link List Lists}. The serializer relies on an element serializer
- * for teh serialization of the list's elements.
+ * for the serialization of the list's elements.
  *
  * <p>The serialization format for the list is as follows: four bytes for the length of the lost,
  * followed by the serialized representation of each element.
diff --git a/flink-dist/src/main/resources/flink-conf.yaml b/flink-dist/src/main/resources/flink-conf.yaml
index efdf33e3852..25d4435452c 100644
--- a/flink-dist/src/main/resources/flink-conf.yaml
+++ b/flink-dist/src/main/resources/flink-conf.yaml
@@ -181,7 +181,7 @@ rest.port: 8081
 
 # The amount of memory going to the network stack. These numbers usually need 
 # no tuning. Adjusting them may be necessary in case of an "Insufficient number
-# of network buffers" error. The default min is 64MB, teh default max is 1GB.
+# of network buffers" error. The default min is 64MB, the default max is 1GB.
 # 
 # taskmanager.network.memory.fraction: 0.1
 # taskmanager.network.memory.min: 64mb
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
index bdc0c6749f6..d4c619d2ad1 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/ClosureCleaner.java
@@ -56,7 +56,7 @@
 	 *                                 not serializable after the closure cleaning.
 	 *
 	 * @throws RuntimeException A RuntimeException may be thrown, if the code of the class could not
-	 *                          be loaded, in order to process during teh closure cleaning.
+	 *                          be loaded, in order to process during the closure cleaning.
 	 */
 	public static void clean(Object func, boolean checkSerializable) {
 		if (func == null) {
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index 44f80afe9f7..d7e7d3fa88e 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -20,14 +20,11 @@ package org.apache.flink.cep.scala
 import java.util.{UUID, List => JList, Map => JMap}
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.cep.pattern.{Pattern => JPattern}
-import org.apache.flink.cep.scala.pattern.Pattern
-import org.apache.flink.cep.{EventComparator, PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
+import org.apache.flink.cep.functions.PatternProcessFunction
+import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream}
 import org.apache.flink.streaming.api.scala.{asScalaStream, _}
 import org.apache.flink.util.Collector
 
-import org.apache.flink.cep.operator.CEPOperatorUtils
-import org.apache.flink.cep.scala.pattern.Pattern
 import scala.collection.Map
 
 /**
@@ -43,11 +40,19 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
   private[flink] def wrappedPatternStream = jPatternStream
 
-  def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
-
-  def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream)
-
-  def getComparator: EventComparator[T] = jPatternStream.getComparator
+  /**
+    * Applies a process function to the detected pattern sequence. For each pattern sequence the
+    * provided [[PatternProcessFunction]] is called.
+    *
+    * @param patternProcessFunction The pattern process function which is called for each detected
+    *                              pattern sequence.
+    * @tparam R Type of the resulting elements
+    * @return [[DataStream]] which contains the resulting elements from the pattern select function.
+    */
+  def process[R: TypeInformation](patternProcessFunction: PatternProcessFunction[T, R])
+  : DataStream[R] = {
+    asScalaStream(jPatternStream.process(patternProcessFunction, implicitly[TypeInformation[R]]))
+  }
 
   /**
     * Applies a select function to the detected pattern sequence. For each pattern sequence the
diff --git a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
index f3371c86c84..5f6055d421b 100644
--- a/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
+++ b/flink-libraries/flink-cep-scala/src/test/scala/org/apache/flink/cep/scala/PatternStreamScalaJavaAPIInteroperabilityTest.scala
@@ -17,31 +17,31 @@
  */
 package org.apache.flink.cep.scala
 
-import org.apache.flink.api.common.functions.util.ListCollector
+import java.lang
+
+import org.apache.flink.api.common.functions.RuntimeContext
+import org.apache.flink.api.common.functions.util.{FunctionUtils, ListCollector}
+import org.apache.flink.cep.functions.{PatternProcessFunction, TimedOutPartialMatchHandler}
+import org.apache.flink.cep.operator.CepOperator
 import org.apache.flink.cep.scala.pattern.Pattern
-import org.apache.flink.streaming.api.operators.{StreamFlatMap, StreamMap}
+import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.streaming.api.transformations.{OneInputTransformation, TwoInputTransformation}
+import org.apache.flink.streaming.api.transformations.OneInputTransformation
+import org.apache.flink.util
 import org.apache.flink.util.{Collector, TestLogger}
-import org.apache.flink.types.{Either => FEither}
-import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
-import java.lang.{Long => JLong}
-import java.util.{Map => JMap}
-import java.util.{List => JList}
-
-import org.apache.flink.cep.operator.{FlatSelectCepOperator, FlatSelectTimeoutCepOperator, SelectCepOperator}
-import org.apache.flink.streaming.api.functions.co.CoMapFunction
-
-import scala.collection.JavaConverters._
-import scala.collection.Map
 import org.junit.Assert._
 import org.junit.Test
+import org.mockito.Mockito
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+import scala.collection.{Map, mutable}
 
 class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
 
   @Test
   @throws[Exception]
-  def testScalaJavaAPISelectFunForwarding {
+  def testScalaJavaAPISelectFunForwarding() {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val dummyDataStream: DataStream[(Int, Int)] = env.fromElements()
     val pattern: Pattern[(Int, Int), (Int, Int)] = Pattern.begin[(Int, Int)]("dummy")
@@ -51,17 +51,21 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
       .select((pattern: Map[String, Iterable[(Int, Int)]]) => {
         //verifies input parameter forwarding
         assertEquals(param, pattern)
-        param.get("begin").get(0)
+        param("begin").head
       })
-    val out = extractUserFunction[SelectCepOperator[(Int, Int), Byte, (Int, Int)]](result)
-      .getUserFunction.select(param.mapValues(_.asJava).asJava)
+
+    val outList = new java.util.ArrayList[(Int, Int)]
+    val outParam = new ListCollector[(Int, Int)](outList)
+
+    val fun = extractFun[(Int, Int), (Int, Int)](result)
+    fun.processMatch(param.mapValues(_.asJava).asJava, new ListTestContext, outParam)
     //verifies output parameter forwarding
-    assertEquals(param.get("begin").get(0), out)
+    assertEquals(param("begin").head, outList.get(0))
   }
 
   @Test
   @throws[Exception]
-  def testScalaJavaAPIFlatSelectFunForwarding {
+  def testScalaJavaAPIFlatSelectFunForwarding() {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val dummyDataStream: DataStream[List[Int]] = env.fromElements()
     val pattern: Pattern[List[Int], List[Int]] = Pattern.begin[List[Int]]("dummy")
@@ -76,18 +80,18 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
       .flatSelect((pattern: Map[String, Iterable[List[Int]]], out: Collector[List[Int]]) => {
         //verifies input parameter forwarding
         assertEquals(inParam, pattern)
-        out.collect(pattern.get("begin").get.head)
+        out.collect(pattern("begin").head)
       })
 
-    extractUserFunction[FlatSelectCepOperator[List[Int], Byte, List[Int]]](result).
-      getUserFunction.flatSelect(inParam.mapValues(_.asJava).asJava, outParam)
+    val fun = extractFun[List[Int], List[Int]](result)
+    fun.processMatch(inParam.mapValues(_.asJava).asJava, new ListTestContext, outParam)
     //verify output parameter forwarding and that flatMap function was actually called
     assertEquals(inList, outList.get(0))
   }
 
   @Test
   @throws[Exception]
-  def testTimeoutHandling: Unit = {
+  def testTimeoutHandling(): Unit = {
     val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
     val dummyDataStream: DataStream[String] = env.fromElements()
     val pattern: Pattern[String, String] = Pattern.begin[String]("dummy")
@@ -95,8 +99,6 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
     val inParam = Map("begin" -> List("barfoo"))
     val outList = new java.util.ArrayList[Either[String, String]]
     val output = new ListCollector[Either[String, String]](outList)
-    val expectedOutput = List(Right("match"), Right("barfoo"), Left("timeout"), Left("barfoo"))
-      .asJava
 
     val outputTag = OutputTag[Either[String, String]]("timeouted")
     val result: DataStream[Either[String, String]] = pStream.flatSelect(outputTag) {
@@ -112,22 +114,45 @@ class PatternStreamScalaJavaAPIInteroperabilityTest extends TestLogger {
           out.collect(Right(pattern("begin").head))
       }
 
-    val fun = extractUserFunction[FlatSelectTimeoutCepOperator[String, Either[String, String],
-      Either[String, String], Byte]](
-      result).getUserFunction
+    val fun = extractFun[String, Either[String, String]](result)
 
-    fun.getFlatSelectFunction.flatSelect(inParam.mapValues(_.asJava).asJava, output)
-    fun.getFlatTimeoutFunction.timeout(inParam.mapValues(_.asJava).asJava, 42L, output)
+    val ctx = new ListTestContext
+    fun.processMatch(inParam.mapValues(_.asJava).asJava, ctx, output)
+    fun.asInstanceOf[TimedOutPartialMatchHandler[String]]
+      .processTimedOutMatch(inParam.mapValues(_.asJava).asJava, ctx)
 
-    assertEquals(expectedOutput, outList)
+    assertEquals(List(Right("match"), Right("barfoo")).asJava, outList)
+    assertEquals(List(Left("timeout"), Left("barfoo")).asJava, ctx.getElements(outputTag).asJava)
   }
 
-  def extractUserFunction[T](dataStream: DataStream[_]) = {
-    dataStream.javaStream
+  def extractFun[IN, OUT](dataStream: DataStream[OUT]): PatternProcessFunction[IN, OUT] = {
+    val oper = dataStream.javaStream
       .getTransformation
       .asInstanceOf[OneInputTransformation[_, _]]
       .getOperator
-      .asInstanceOf[T]
+      .asInstanceOf[CepOperator[IN, Byte, OUT]]
+
+    val fun = oper.getUserFunction
+    FunctionUtils.setFunctionRuntimeContext(fun, Mockito.mock(classOf[RuntimeContext]))
+    FunctionUtils.openFunction(fun, new Configuration())
+    fun
+  }
+
+  class ListTestContext extends PatternProcessFunction.Context {
+
+    private val outputs = new mutable.HashMap[util.OutputTag[_], mutable.ListBuffer[Any]]()
+
+    def getElements(outputTag: OutputTag[_]): ListBuffer[Any] = {
+      outputs.getOrElse(outputTag, ListBuffer.empty)
+    }
+
+    override def output[X](outputTag: util.OutputTag[X], value: X): Unit = {
+      outputs.getOrElseUpdate(outputTag, ListBuffer.empty).append(value)
+    }
+
+    override def timestamp(): lang.Long = null
+
+    override def currentProcessingTime(): Long = System.currentTimeMillis()
   }
 
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
index e6845609759..377487921b9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CEP.java
@@ -48,7 +48,11 @@
 	 * @param <T> Type of the input events
 	 * @return Resulting pattern stream
 	 */
-	public static <T> PatternStream<T> pattern(DataStream<T> input, Pattern<T, ?> pattern, EventComparator<T> comparator) {
-		return new PatternStream<>(input, pattern, comparator);
+	public static <T> PatternStream<T> pattern(
+			DataStream<T> input,
+			Pattern<T, ?> pattern,
+			EventComparator<T> comparator) {
+		final PatternStream<T> stream = new PatternStream<>(input, pattern);
+		return stream.withComparator(comparator);
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternProcessFunctionBuilder.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternProcessFunctionBuilder.java
new file mode 100644
index 00000000000..abdd0611f1a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternProcessFunctionBuilder.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.adaptors.PatternFlatSelectAdapter;
+import org.apache.flink.cep.functions.adaptors.PatternSelectAdapter;
+import org.apache.flink.cep.functions.adaptors.PatternTimeoutFlatSelectAdapter;
+import org.apache.flink.cep.functions.adaptors.PatternTimeoutSelectAdapter;
+import org.apache.flink.util.OutputTag;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Builder for adapting pre-1.8 functions like {@link PatternFlatSelectFunction}, {@link PatternFlatTimeoutFunction}
+ * into {@link PatternProcessFunction}.
+ */
+@Internal
+class PatternProcessFunctionBuilder {
+
+	/**
+	 * Starts constructing a {@link PatternProcessFunction} from a {@link PatternFlatSelectFunction} that
+	 * emitted elements through {@link org.apache.flink.util.Collector}.
+	 */
+	static <IN, OUT> FlatSelectBuilder<IN, OUT> fromFlatSelect(final PatternFlatSelectFunction<IN, OUT> function) {
+		return new FlatSelectBuilder<>(function);
+	}
+
+	/**
+	 * Starts constructing a {@link PatternProcessFunction} from a {@link PatternSelectFunction} that
+	 * emitted elements through return value.
+	 */
+	static <IN, OUT> SelectBuilder<IN, OUT> fromSelect(final PatternSelectFunction<IN, OUT> function) {
+		return new SelectBuilder<>(function);
+	}
+
+	/**
+	 * Wraps {@link PatternFlatSelectFunction} in a builder. The builder can construct a
+	 * 	 * {@link PatternProcessFunction} adapter.
+	 */
+	static class FlatSelectBuilder<IN, OUT> {
+
+		private final PatternFlatSelectFunction<IN, OUT> flatSelectFunction;
+
+		FlatSelectBuilder(PatternFlatSelectFunction<IN, OUT> function) {
+			this.flatSelectFunction = checkNotNull(function);
+		}
+
+		<TIMED_OUT> FlatTimeoutSelectBuilder<IN, OUT, TIMED_OUT> withTimeoutHandler(
+				final OutputTag<TIMED_OUT> outputTag,
+				final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler) {
+			return new FlatTimeoutSelectBuilder<>(flatSelectFunction, timeoutHandler, outputTag);
+		}
+
+		PatternProcessFunction<IN, OUT> build() {
+			return new PatternFlatSelectAdapter<>(flatSelectFunction);
+		}
+	}
+
+	/**
+	 * Wraps {@link PatternFlatSelectFunction} and {@link PatternFlatTimeoutFunction} in a builder. The builder will
+	 * create a {@link PatternProcessFunction} adapter that handles timed out partial matches as well.
+	 */
+	static class FlatTimeoutSelectBuilder<IN, OUT, TIMED_OUT> {
+		private final PatternFlatSelectFunction<IN, OUT> flatSelectFunction;
+
+		private final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler;
+		private final OutputTag<TIMED_OUT> outputTag;
+
+		FlatTimeoutSelectBuilder(
+				final PatternFlatSelectFunction<IN, OUT> flatSelectFunction,
+				final PatternFlatTimeoutFunction<IN, TIMED_OUT> timeoutHandler,
+				final OutputTag<TIMED_OUT> outputTag) {
+			this.flatSelectFunction = checkNotNull(flatSelectFunction);
+			this.timeoutHandler = checkNotNull(timeoutHandler);
+			this.outputTag = checkNotNull(outputTag);
+		}
+
+		PatternProcessFunction<IN, OUT> build() {
+			return new PatternTimeoutFlatSelectAdapter<>(flatSelectFunction, timeoutHandler, outputTag);
+		}
+	}
+
+	/**
+	 * Wraps {@link PatternSelectFunction} in a builder. The builder can construct a
+	 * {@link PatternProcessFunction} adapter.
+	 */
+	static class SelectBuilder<IN, OUT> {
+
+		private final PatternSelectFunction<IN, OUT> selectFunction;
+
+		SelectBuilder(PatternSelectFunction<IN, OUT> function) {
+			this.selectFunction = checkNotNull(function);
+		}
+
+		<TIMED_OUT> TimeoutSelectBuilder<IN, OUT, TIMED_OUT> withTimeoutHandler(
+				final OutputTag<TIMED_OUT> outputTag,
+				final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler) {
+			return new TimeoutSelectBuilder<>(selectFunction, timeoutHandler, outputTag);
+		}
+
+		PatternProcessFunction<IN, OUT> build() {
+			return new PatternSelectAdapter<>(selectFunction);
+		}
+	}
+
+	/**
+	 * Wraps {@link PatternSelectFunction} and {@link PatternTimeoutFunction} in a builder. The builder will create a
+	 * {@link PatternProcessFunction} adapter that handles timed out partial matches as well.
+	 */
+	static class TimeoutSelectBuilder<IN, OUT, TIMED_OUT> {
+		private final PatternSelectFunction<IN, OUT> selectFunction;
+
+		private final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler;
+		private final OutputTag<TIMED_OUT> outputTag;
+
+		TimeoutSelectBuilder(
+				final PatternSelectFunction<IN, OUT> flatSelectFunction,
+				final PatternTimeoutFunction<IN, TIMED_OUT> timeoutHandler,
+				final OutputTag<TIMED_OUT> outputTag) {
+			this.selectFunction = checkNotNull(flatSelectFunction);
+			this.timeoutHandler = checkNotNull(timeoutHandler);
+			this.outputTag = checkNotNull(outputTag);
+		}
+
+		PatternProcessFunction<IN, OUT> build() {
+			return new PatternTimeoutSelectAdapter<>(selectFunction, timeoutHandler, outputTag);
+		}
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index 521665f9a63..ca918a9e199 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -19,11 +19,11 @@
 package org.apache.flink.cep;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.EitherTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.cep.operator.CEPOperatorUtils;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -33,6 +33,10 @@
 
 import java.util.UUID;
 
+import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromFlatSelect;
+import static org.apache.flink.cep.PatternProcessFunctionBuilder.fromSelect;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Stream abstraction for CEP pattern detection. A pattern stream is a stream which emits detected
  * pattern sequences as a map of events associated with their names. The pattern is detected using a
@@ -46,42 +50,68 @@
  */
 public class PatternStream<T> {
 
-	// underlying data stream
-	private final DataStream<T> inputStream;
-
-	private final Pattern<T, ?> pattern;
+	private final PatternStreamBuilder<T> builder;
 
-	// comparator to sort events
-	private final EventComparator<T> comparator;
-
-	/**
-	 * Side output {@code OutputTag} for late data. If no tag is set late data will be simply
-	 * dropped.
-	 */
-	private OutputTag<T> lateDataOutputTag;
+	private PatternStream(final PatternStreamBuilder<T> builder) {
+		this.builder = checkNotNull(builder);
+	}
 
 	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) {
-		this.inputStream = inputStream;
-		this.pattern = pattern;
-		this.comparator = null;
+		this(PatternStreamBuilder.forStreamAndPattern(inputStream, pattern));
 	}
 
-	PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern, final EventComparator<T> comparator) {
-		this.inputStream = inputStream;
-		this.pattern = pattern;
-		this.comparator = comparator;
+	PatternStream<T> withComparator(final EventComparator<T> comparator) {
+		return new PatternStream<>(builder.withComparator(comparator));
 	}
 
-	public Pattern<T, ?> getPattern() {
-		return pattern;
+	public PatternStream<T> sideOutputLateData(OutputTag<T> lateDataOutputTag) {
+		return new PatternStream<>(builder.withLateDataOutputTag(lateDataOutputTag));
 	}
 
-	public DataStream<T> getInputStream() {
-		return inputStream;
+	/**
+	 * Applies a process function to the detected pattern sequence. For each pattern sequence the
+	 * provided {@link PatternProcessFunction} is called. In order to process timed out partial matches as well one can
+	 * use {@link TimedOutPartialMatchHandler} as additional interface.
+	 *
+	 * @param patternProcessFunction The pattern process function which is called for each detected
+	 *                               pattern sequence.
+	 * @param <R> Type of the resulting elements
+	 * @return {@link DataStream} which contains the resulting elements from the pattern process
+	 *         function.
+	 */
+	public <R> SingleOutputStreamOperator<R> process(final PatternProcessFunction<T, R> patternProcessFunction) {
+		final TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
+			patternProcessFunction,
+			PatternProcessFunction.class,
+			0,
+			1,
+			TypeExtractor.NO_INDEX,
+			builder.getInputType(),
+			null,
+			false);
+
+		return process(patternProcessFunction, returnType);
 	}
 
-	public EventComparator<T> getComparator() {
-		return comparator;
+	/**
+	 * Applies a process function to the detected pattern sequence. For each pattern sequence the
+	 * provided {@link PatternProcessFunction} is called. In order to process timed out partial matches as well one can
+	 * use {@link TimedOutPartialMatchHandler} as additional interface.
+	 *
+	 * @param patternProcessFunction The pattern process function which is called for each detected
+	 *                              pattern sequence.
+	 * @param <R> Type of the resulting elements
+	 * @param outTypeInfo Explicit specification of output type.
+	 * @return {@link DataStream} which contains the resulting elements from the pattern process
+	 *         function.
+	 */
+	public <R> SingleOutputStreamOperator<R> process(
+			final PatternProcessFunction<T, R> patternProcessFunction,
+			final TypeInformation<R> outTypeInfo) {
+
+		return builder.build(
+			outTypeInfo,
+			builder.clean(patternProcessFunction));
 	}
 
 	/**
@@ -99,28 +129,19 @@
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
 
-		TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> returnType = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
 		return select(patternSelectFunction, returnType);
 	}
 
-	/**
-	 * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
-	 * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
-	 *
-	 * @return The cleaned Function
-	 */
-	private  <F> F clean(F f) {
-		return inputStream.getExecutionEnvironment().clean(f);
-	}
 
 	/**
 	 * Applies a select function to the detected pattern sequence. For each pattern sequence the
@@ -134,8 +155,14 @@
 	 * @return {@link DataStream} which contains the resulting elements from the pattern select
 	 *         function.
 	 */
-	public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) {
-		return CEPOperatorUtils.createPatternStream(inputStream, pattern, comparator, clean(patternSelectFunction), outTypeInfo, lateDataOutputTag);
+	public <R> SingleOutputStreamOperator<R> select(
+			final PatternSelectFunction<T, R> patternSelectFunction,
+			final TypeInformation<R> outTypeInfo) {
+
+		final PatternProcessFunction<T, R> processFunction =
+			fromSelect(builder.clean(patternSelectFunction)).build();
+
+		return process(processFunction, outTypeInfo);
 	}
 
 	/**
@@ -152,7 +179,7 @@
 	 * {@link SingleOutputStreamOperator} resulting from the select operation
 	 * with the same {@link OutputTag}.
 	 *
-	 * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
+	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
 	 * @param patternTimeoutFunction The pattern timeout function which is called for each partial
 	 *                               pattern sequence which has timed out.
 	 * @param patternSelectFunction The pattern select function which is called for each detected
@@ -163,22 +190,22 @@
 	 * elements in a side output.
 	 */
 	public <L, R> SingleOutputStreamOperator<R> select(
-		final OutputTag<L> timeoutOutputTag,
-		final PatternTimeoutFunction<T, L> patternTimeoutFunction,
-		final PatternSelectFunction<T, R> patternSelectFunction) {
+			final OutputTag<L> timedOutPartialMatchesTag,
+			final PatternTimeoutFunction<T, L> patternTimeoutFunction,
+			final PatternSelectFunction<T, R> patternSelectFunction) {
 
-		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
 		return select(
-			timeoutOutputTag,
+			timedOutPartialMatchesTag,
 			patternTimeoutFunction,
 			rightTypeInfo,
 			patternSelectFunction);
@@ -198,7 +225,7 @@
 	 * {@link SingleOutputStreamOperator} resulting from the select operation
 	 * with the same {@link OutputTag}.
 	 *
-	 * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
+	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
 	 * @param patternTimeoutFunction The pattern timeout function which is called for each partial
 	 *                               pattern sequence which has timed out.
 	 * @param outTypeInfo Explicit specification of output type.
@@ -210,19 +237,17 @@
 	 * elements in a side output.
 	 */
 	public <L, R> SingleOutputStreamOperator<R> select(
-			final OutputTag<L> timeoutOutputTag,
+			final OutputTag<L> timedOutPartialMatchesTag,
 			final PatternTimeoutFunction<T, L> patternTimeoutFunction,
 			final TypeInformation<R> outTypeInfo,
 			final PatternSelectFunction<T, R> patternSelectFunction) {
-		return CEPOperatorUtils.createTimeoutPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternSelectFunction),
-			outTypeInfo,
-			timeoutOutputTag,
-			clean(patternTimeoutFunction),
-			lateDataOutputTag);
+
+		final PatternProcessFunction<T, R> processFunction =
+			fromSelect(builder.clean(patternSelectFunction))
+				.withTimeoutHandler(timedOutPartialMatchesTag, builder.clean(patternTimeoutFunction))
+				.build();
+
+		return process(processFunction, outTypeInfo);
 	}
 
 	/**
@@ -249,46 +274,45 @@
 	 */
 	@Deprecated
 	public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
-		final PatternTimeoutFunction<T, L> patternTimeoutFunction,
-		final PatternSelectFunction<T, R> patternSelectFunction) {
+			final PatternTimeoutFunction<T, L> patternTimeoutFunction,
+			final PatternSelectFunction<T, R> patternSelectFunction) {
 
-		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> mainTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternSelectFunction,
 			PatternSelectFunction.class,
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
-		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<L> timeoutTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternTimeoutFunction,
 			PatternTimeoutFunction.class,
 			0,
 			1,
 			TypeExtractor.NO_INDEX,
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
-		final OutputTag<L> outputTag = new OutputTag<L>(UUID.randomUUID().toString(), leftTypeInfo);
+		final TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(timeoutTypeInfo, mainTypeInfo);
 
-		final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternSelectFunction),
-			rightTypeInfo,
-			outputTag,
-			clean(patternTimeoutFunction),
-			lateDataOutputTag);
+		final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timeoutTypeInfo);
 
-		final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
+		final PatternProcessFunction<T, R> processFunction =
+			fromSelect(builder.clean(patternSelectFunction))
+				.withTimeoutHandler(outputTag, builder.clean(patternTimeoutFunction))
+				.build();
 
-		TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
+		final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
+		final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
 
-		return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
+		return mainStream
+			.connect(timedOutStream)
+			.map(new CoMapTimeout<>())
+			.returns(outTypeInfo);
 	}
 
 	/**
@@ -305,13 +329,14 @@
 	public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 		// we have to extract the output type from the provided pattern selection function manually
 		// because the TypeExtractor cannot do that if the method is wrapped in a MapFunction
-		TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+
+		final TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
 			0,
 			1,
-			new int[] {1, 0},
-			inputStream.getType(),
+			new int[]{1, 0},
+			builder.getInputType(),
 			null,
 			false);
 
@@ -333,13 +358,12 @@
 	public <R> SingleOutputStreamOperator<R> flatSelect(
 			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction,
 			final TypeInformation<R> outTypeInfo) {
-		return CEPOperatorUtils.createPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternFlatSelectFunction),
-			outTypeInfo,
-			lateDataOutputTag);
+
+		final PatternProcessFunction<T, R> processFunction =
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.build();
+
+		return process(processFunction, outTypeInfo);
 	}
 
 	/**
@@ -356,7 +380,7 @@
 	 * {@link SingleOutputStreamOperator} resulting from the select operation
 	 * with the same {@link OutputTag}.
 	 *
-	 * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
+	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
 	 * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial
 	 *                               pattern sequence which has timed out.
 	 * @param patternFlatSelectFunction The pattern select function which is called for each detected
@@ -367,21 +391,25 @@
 	 * elements in a side output.
 	 */
 	public <L, R> SingleOutputStreamOperator<R> flatSelect(
-		final OutputTag<L> timeoutOutputTag,
-		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
-		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+			final OutputTag<L> timedOutPartialMatchesTag,
+			final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
+			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
-		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
 			0,
 			1,
 			new int[]{1, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
-		return flatSelect(timeoutOutputTag, patternFlatTimeoutFunction, rightTypeInfo, patternFlatSelectFunction);
+		return flatSelect(
+			timedOutPartialMatchesTag,
+			patternFlatTimeoutFunction,
+			rightTypeInfo,
+			patternFlatSelectFunction);
 	}
 
 	/**
@@ -398,7 +426,7 @@
 	 * {@link SingleOutputStreamOperator} resulting from the select operation
 	 * with the same {@link OutputTag}.
 	 *
-	 * @param timeoutOutputTag {@link OutputTag} that identifies side output with timed out patterns
+	 * @param timedOutPartialMatchesTag {@link OutputTag} that identifies side output with timed out patterns
 	 * @param patternFlatTimeoutFunction The pattern timeout function which is called for each partial
 	 *                               pattern sequence which has timed out.
 	 * @param patternFlatSelectFunction The pattern select function which is called for each detected
@@ -410,20 +438,17 @@
 	 * elements in a side output.
 	 */
 	public <L, R> SingleOutputStreamOperator<R> flatSelect(
-		final OutputTag<L> timeoutOutputTag,
-		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
-		final TypeInformation<R> outTypeInfo,
-		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
-
-		return CEPOperatorUtils.createTimeoutPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternFlatSelectFunction),
-			outTypeInfo,
-			timeoutOutputTag,
-			clean(patternFlatTimeoutFunction),
-			lateDataOutputTag);
+			final OutputTag<L> timedOutPartialMatchesTag,
+			final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
+			final TypeInformation<R> outTypeInfo,
+			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+
+		final PatternProcessFunction<T, R> processFunction =
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.withTimeoutHandler(timedOutPartialMatchesTag, builder.clean(patternFlatTimeoutFunction))
+				.build();
+
+		return process(processFunction, outTypeInfo);
 	}
 
 	/**
@@ -451,51 +476,44 @@
 	 */
 	@Deprecated
 	public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
-		final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
-		final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
+			final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction,
+			final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
 
-		TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<L> timedOutTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatTimeoutFunction,
 			PatternFlatTimeoutFunction.class,
 			0,
 			1,
 			new int[]{2, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
-		TypeInformation<R> rightTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
+		final TypeInformation<R> mainTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
 			patternFlatSelectFunction,
 			PatternFlatSelectFunction.class,
 			0,
 			1,
 			new int[]{1, 0},
-			inputStream.getType(),
+			builder.getInputType(),
 			null,
 			false);
 
-		final OutputTag<L> outputTag = new OutputTag<L>(UUID.randomUUID().toString(), leftTypeInfo);
+		final OutputTag<L> outputTag = new OutputTag<>(UUID.randomUUID().toString(), timedOutTypeInfo);
 
-		final SingleOutputStreamOperator<R> mainStream = CEPOperatorUtils.createTimeoutPatternStream(
-			inputStream,
-			pattern,
-			comparator,
-			clean(patternFlatSelectFunction),
-			rightTypeInfo,
-			outputTag,
-			clean(patternFlatTimeoutFunction),
-			lateDataOutputTag);
+		final PatternProcessFunction<T, R> processFunction =
+			fromFlatSelect(builder.clean(patternFlatSelectFunction))
+				.withTimeoutHandler(outputTag, builder.clean(patternFlatTimeoutFunction))
+				.build();
 
+		final SingleOutputStreamOperator<R> mainStream = process(processFunction, mainTypeInfo);
 		final DataStream<L> timedOutStream = mainStream.getSideOutput(outputTag);
+		final TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(timedOutTypeInfo, mainTypeInfo);
 
-		TypeInformation<Either<L, R>> outTypeInfo = new EitherTypeInfo<>(leftTypeInfo, rightTypeInfo);
-
-		return mainStream.connect(timedOutStream).map(new CoMapTimeout<>()).returns(outTypeInfo);
-	}
-
-	public PatternStream<T> sideOutputLateData(OutputTag<T> outputTag) {
-		this.lateDataOutputTag = clean(outputTag);
-		return this;
+		return mainStream
+				.connect(timedOutStream)
+				.map(new CoMapTimeout<>())
+				.returns(outTypeInfo);
 	}
 
 	/**
@@ -507,12 +525,12 @@
 		private static final long serialVersionUID = 2059391566945212552L;
 
 		@Override
-		public Either<L, R> map1(R value) throws Exception {
+		public Either<L, R> map1(R value) {
 			return Either.Right(value);
 		}
 
 		@Override
-		public Either<L, R> map2(L value) throws Exception {
+		public Either<L, R> map2(L value) {
 			return Either.Left(value);
 		}
 	}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
new file mode 100644
index 00000000000..13d68e287d7
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStreamBuilder.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.functions.NullByteKeySelector;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Utility method for creating {@link PatternStream}.
+ */
+@Internal
+final class PatternStreamBuilder<IN> {
+
+	private final DataStream<IN> inputStream;
+
+	private final Pattern<IN, ?> pattern;
+
+	private final EventComparator<IN> comparator;
+
+	/**
+	 * Side output {@code OutputTag} for late data.
+	 * If no tag is set late data will be simply dropped.
+	 */
+	private final OutputTag<IN> lateDataOutputTag;
+
+	private PatternStreamBuilder(
+			final DataStream<IN> inputStream,
+			final Pattern<IN, ?> pattern,
+			@Nullable final EventComparator<IN> comparator,
+			@Nullable final OutputTag<IN> lateDataOutputTag) {
+
+		this.inputStream = checkNotNull(inputStream);
+		this.pattern = checkNotNull(pattern);
+		this.comparator = comparator;
+		this.lateDataOutputTag = lateDataOutputTag;
+	}
+
+	TypeInformation<IN> getInputType() {
+		return inputStream.getType();
+	}
+
+	/**
+	 * Invokes the {@link org.apache.flink.api.java.ClosureCleaner}
+	 * on the given function if closure cleaning is enabled in the {@link ExecutionConfig}.
+	 *
+	 * @return The cleaned Function
+	 */
+	<F> F clean(F f) {
+		return inputStream.getExecutionEnvironment().clean(f);
+	}
+
+	PatternStreamBuilder<IN> withComparator(final EventComparator<IN> comparator) {
+		return new PatternStreamBuilder<>(inputStream, pattern, checkNotNull(comparator), lateDataOutputTag);
+	}
+
+	PatternStreamBuilder<IN> withLateDataOutputTag(final OutputTag<IN> lateDataOutputTag) {
+		return new PatternStreamBuilder<>(inputStream, pattern, comparator, checkNotNull(lateDataOutputTag));
+	}
+
+	/**
+	 * Creates a data stream containing results of {@link PatternProcessFunction} to fully matching event patterns.
+	 *
+	 * @param processFunction function to be applied to matching event sequences
+	 * @param outTypeInfo output TypeInformation of
+	 *        {@link PatternProcessFunction#processMatch(Map, PatternProcessFunction.Context, Collector)}
+	 * @param <OUT> type of output events
+	 * @return Data stream containing fully matched event sequence with applied {@link PatternProcessFunction}
+	 */
+	<OUT, K> SingleOutputStreamOperator<OUT> build(
+			final TypeInformation<OUT> outTypeInfo,
+			final PatternProcessFunction<IN, OUT> processFunction) {
+
+		checkNotNull(outTypeInfo);
+		checkNotNull(processFunction);
+
+		final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
+		final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
+
+		final boolean timeoutHandling = processFunction instanceof TimedOutPartialMatchHandler;
+		final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
+
+		final CepOperator<IN, K, OUT> operator = new CepOperator<>(
+			inputSerializer,
+			isProcessingTime,
+			nfaFactory,
+			comparator,
+			pattern.getAfterMatchSkipStrategy(),
+			processFunction,
+			lateDataOutputTag);
+
+		final SingleOutputStreamOperator<OUT> patternStream;
+		if (inputStream instanceof KeyedStream) {
+			KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
+
+			patternStream = keyedStream.transform(
+				"CepOperator",
+				outTypeInfo,
+				operator);
+		} else {
+			KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
+
+			patternStream = inputStream.keyBy(keySelector).transform(
+				"GlobalCepOperator",
+				outTypeInfo,
+				operator
+			).forceNonParallel();
+		}
+
+		return patternStream;
+	}
+
+	// ---------------------------------------- factory-like methods ---------------------------------------- //
+
+	static <IN> PatternStreamBuilder<IN> forStreamAndPattern(final DataStream<IN> inputStream, final Pattern<IN, ?> pattern) {
+		return new PatternStreamBuilder<>(inputStream, pattern, null, null);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
index a2b89c37e97..7ac199e69b1 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternFlatSelectFunction.java
@@ -20,9 +20,7 @@
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
@@ -42,16 +40,5 @@
 
 	private static final long serialVersionUID = 1L;
 
-	@Override
-	public void setRuntimeContext(RuntimeContext runtimeContext) {
-		Preconditions.checkNotNull(runtimeContext);
-
-		if (runtimeContext instanceof CepRuntimeContext) {
-			super.setRuntimeContext(runtimeContext);
-		} else {
-			super.setRuntimeContext(new CepRuntimeContext(runtimeContext));
-		}
-	}
-
-	public abstract void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception;
+	public abstract void flatSelect(final Map<String, List<IN>> pattern, final Collector<OUT> out) throws Exception;
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
index ce694a38e05..a907e2bb702 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/RichPatternSelectFunction.java
@@ -20,8 +20,6 @@
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.util.Preconditions;
 
 import java.util.List;
 import java.util.Map;
@@ -41,16 +39,5 @@
 
 	private static final long serialVersionUID = 1L;
 
-	@Override
-	public void setRuntimeContext(RuntimeContext runtimeContext) {
-		Preconditions.checkNotNull(runtimeContext);
-
-		if (runtimeContext instanceof CepRuntimeContext) {
-			super.setRuntimeContext(runtimeContext);
-		} else {
-			super.setRuntimeContext(new CepRuntimeContext(runtimeContext));
-		}
-	}
-
-	public abstract OUT select(Map<String, List<IN>> pattern) throws Exception;
+	public abstract OUT select(final Map<String, List<IN>> pattern) throws Exception;
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
new file mode 100644
index 00000000000..23367cdfc6f
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/context/TimerContext.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.context;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+
+/**
+ * Enables access to time related characteristics such as current processing time or timestamp of currently processed
+ * element. Used in {@link PatternProcessFunction} and
+ * {@link org.apache.flink.cep.pattern.conditions.IterativeCondition}
+ */
+@PublicEvolving
+public interface TimerContext {
+
+	/**
+	 * Timestamp of the element currently being processed.
+	 *
+	 * <p>This might be {@code null}, for example if the time characteristic of your program
+	 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+	 */
+	Long timestamp();
+
+	/** Returns the current processing time. */
+	long currentProcessingTime();
+
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
new file mode 100644
index 00000000000..b392501a683
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/PatternProcessFunction.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.cep.context.TimerContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * It is called with a map of detected events which are identified by their names.
+ * The names are defined by the {@link org.apache.flink.cep.pattern.Pattern} specifying
+ * the sought-after pattern. This is the preferred way to process found matches.
+ *
+ * <pre>{@code
+ * PatternStream<IN> pattern = ...
+ *
+ * DataStream<OUT> result = pattern.process(new MyPatternProcessFunction());
+ * }</pre>
+ * @param <IN> type of incoming elements
+ * @param <OUT> type of produced elements based on found matches
+ */
+@PublicEvolving
+public abstract class PatternProcessFunction<IN, OUT> extends AbstractRichFunction {
+
+	/**
+	 * Generates resulting elements given a map of detected pattern events. The events
+	 * are identified by their specified names.
+	 *
+	 * <p>{@link PatternProcessFunction.Context#timestamp()} in this case returns the time of the last element that was
+	 * assigned to the match, resulting in this partial match being finished.
+	 *
+	 * @param match map containing the found pattern. Events are identified by their names.
+	 * @param ctx enables access to time features and emitting results through side outputs
+	 * @param out Collector used to output the generated elements
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 *                   operation to fail and may trigger recovery.
+	 */
+	public abstract void processMatch(
+		final Map<String, List<IN>> match,
+		final Context ctx,
+		final Collector<OUT> out) throws Exception;
+
+	/**
+	 * Gives access to time related characteristics as well as enables emitting elements to side outputs.
+	 */
+	public interface Context extends TimerContext {
+		/**
+		 * Emits a record to the side output identified by the {@link OutputTag}.
+		 *
+		 * @param outputTag the {@code OutputTag} that identifies the side output to emit to.
+		 * @param value The record to emit.
+		 */
+		<X> void output(final OutputTag<X> outputTag, final X value);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
new file mode 100644
index 00000000000..2871039cbe0
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/TimedOutPartialMatchHandler.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.windowing.time.Time;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Enables handling timed out partial matches. It shall be used in a mixin style. If you need your
+ * {@link PatternProcessFunction} to be able to handle timed out partial matches implement this interface as well.
+ * Example:
+ *
+ * <pre>
+ * {@code
+ * private class MyFunction extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
+ *
+ * }
+ * }
+ * </pre>
+ *
+ * @param <IN> type of input elements
+ */
+@PublicEvolving
+public interface TimedOutPartialMatchHandler<IN> {
+
+	/**
+	 * Called for every timed out partial match (due to {@link org.apache.flink.cep.pattern.Pattern#within(Time)}).
+	 * It enables custom handling, e.g. one can emit the timed out results through a side output:
+	 *
+	 * <pre>
+	 * {@code
+	 *
+	 * private final OutputTag<T> timedOutPartialMatchesTag = ...
+	 *
+	 * private class MyFunction extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
+	 *
+	 *     @Override
+	 *     public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception {
+	 *          ...
+	 *     }
+	 *
+	 *     @Override
+	 *     void processTimedOutMatch(Map<String, List<IN>> match, PatternProcessFunction.Context ctx) throws Exception {
+	 *          ctx.output(timedOutPartialMatchesTag, match);
+	 *     }
+	 * }
+	 * }
+	 * </pre>
+	 *
+	 * <p>{@link PatternProcessFunction.Context#timestamp()} in this case returns the minimal time in which we can
+	 * say that the partial match will not become a match, which is effectively the timestamp of the first element
+	 * assigned to the partial match plus the value of within.
+	 *
+	 * @param match map containing the timed out partial match. Events are identified by their names.
+	 * @param ctx enables access to time features and emitting results through side outputs
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the
+	 *                   operation to fail and may trigger recovery.
+	 */
+	void processTimedOutMatch(
+		final Map<String, List<IN>> match,
+		final PatternProcessFunction.Context ctx) throws Exception;
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternFlatSelectAdapter.java
new file mode 100644
index 00000000000..90c93976cc1
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternFlatSelectAdapter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions.adaptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter that expresses {@link PatternFlatSelectFunction} with {@link PatternProcessFunction}.
+ */
+@Internal
+public class PatternFlatSelectAdapter<IN, OUT> extends PatternProcessFunction<IN, OUT> {
+
+	private final PatternFlatSelectFunction<IN, OUT> flatSelectFunction;
+
+	public PatternFlatSelectAdapter(final PatternFlatSelectFunction<IN, OUT> flatSelectFunction) {
+		this.flatSelectFunction = checkNotNull(flatSelectFunction);
+	}
+
+	@Override
+	public void open(final Configuration parameters) throws Exception {
+		FunctionUtils.setFunctionRuntimeContext(flatSelectFunction, getRuntimeContext());
+		FunctionUtils.openFunction(flatSelectFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(flatSelectFunction);
+	}
+
+	@Override
+	public void processMatch(
+			final Map<String, List<IN>> match,
+			final Context ctx,
+			final Collector<OUT> out) throws Exception {
+		flatSelectFunction.flatSelect(match, out);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternSelectAdapter.java
new file mode 100644
index 00000000000..76ccf9e368a
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternSelectAdapter.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions.adaptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter that expresses {@link PatternSelectFunction} with {@link PatternProcessFunction}.
+ */
+@Internal
+public class PatternSelectAdapter<IN, OUT> extends PatternProcessFunction<IN, OUT> {
+
+	private final PatternSelectFunction<IN, OUT> selectFunction;
+
+	public PatternSelectAdapter(final PatternSelectFunction<IN, OUT> selectFunction) {
+		this.selectFunction = checkNotNull(selectFunction);
+	}
+
+	@Override
+	public void open(final Configuration parameters) throws Exception {
+		FunctionUtils.setFunctionRuntimeContext(selectFunction, getRuntimeContext());
+		FunctionUtils.openFunction(selectFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		FunctionUtils.closeFunction(selectFunction);
+	}
+
+	@Override
+	public void processMatch(
+			final Map<String, List<IN>> match,
+			final Context ctx,
+			final Collector<OUT> out) throws Exception {
+		out.collect(selectFunction.select(match));
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
new file mode 100644
index 00000000000..ab9c97dde66
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutFlatSelectAdapter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions.adaptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.PatternFlatSelectFunction;
+import org.apache.flink.cep.PatternFlatTimeoutFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter that expresses combination of {@link PatternFlatSelectFunction} and {@link PatternTimeoutFlatSelectAdapter}
+ * with {@link PatternProcessFunction}.
+ */
+@Internal
+public class PatternTimeoutFlatSelectAdapter<IN, OUT, T>
+		extends PatternFlatSelectAdapter<IN, OUT>
+		implements TimedOutPartialMatchHandler<IN> {
+
+	private final PatternFlatTimeoutFunction<IN, T> flatTimeoutFunction;
+	private final OutputTag<T> timedOutPartialMatchesTag;
+
+	private transient SideCollector<T> sideCollector;
+
+	public PatternTimeoutFlatSelectAdapter(
+			PatternFlatSelectFunction<IN, OUT> flatSelectFunction,
+			PatternFlatTimeoutFunction<IN, T> flatTimeoutFunction,
+			OutputTag<T> timedOutPartialMatchesTag) {
+		super(flatSelectFunction);
+		this.flatTimeoutFunction = checkNotNull(flatTimeoutFunction);
+		this.timedOutPartialMatchesTag = checkNotNull(timedOutPartialMatchesTag);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		FunctionUtils.setFunctionRuntimeContext(flatTimeoutFunction, getRuntimeContext());
+		FunctionUtils.openFunction(flatTimeoutFunction, parameters);
+
+		if (sideCollector == null) {
+			sideCollector = new SideCollector<>(checkNotNull(timedOutPartialMatchesTag));
+		}
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		FunctionUtils.closeFunction(flatTimeoutFunction);
+	}
+
+	@Override
+	public void processTimedOutMatch(
+			Map<String, List<IN>> match,
+			Context ctx) throws Exception {
+		sideCollector.setCtx(ctx);
+		long timestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
+		flatTimeoutFunction.timeout(match, timestamp, sideCollector);
+	}
+
+	/**
+	 * Adapter that emitting timed out results from {@link PatternFlatTimeoutFunction}, which expects {@link Collector}
+	 * through the {@link PatternProcessFunction.Context} of enclosing {@link PatternProcessFunction}.
+	 */
+	private static final class SideCollector<T> implements Collector<T> {
+
+		private final OutputTag<T> timedOutPartialMatchesTag;
+
+		private transient Context ctx;
+
+		private SideCollector(OutputTag<T> timedOutPartialMatchesTag) {
+			this.timedOutPartialMatchesTag = checkNotNull(timedOutPartialMatchesTag);
+		}
+
+		public void setCtx(Context ctx) {
+			this.ctx = ctx;
+		}
+
+		@Override
+		public void collect(T record) {
+			ctx.output(timedOutPartialMatchesTag, record);
+		}
+
+		@Override
+		public void close() {}
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
new file mode 100644
index 00000000000..29b0cf768fa
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/functions/adaptors/PatternTimeoutSelectAdapter.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.functions.adaptors;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
+import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.PatternTimeoutFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Adapter that expresses combination of {@link PatternSelectFunction} and {@link PatternTimeoutFlatSelectAdapter}
+ * with {@link PatternProcessFunction}.
+ */
+@Internal
+public class PatternTimeoutSelectAdapter<IN, OUT, T>
+		extends PatternSelectAdapter<IN, OUT>
+		implements TimedOutPartialMatchHandler<IN> {
+
+	private final PatternTimeoutFunction<IN, T> timeoutFunction;
+	private final OutputTag<T> timedOutPartialMatchesTag;
+
+	public PatternTimeoutSelectAdapter(
+			final PatternSelectFunction<IN, OUT> selectFunction,
+			final PatternTimeoutFunction<IN, T> timeoutFunction,
+			final OutputTag<T> timedOutPartialMatchesTag) {
+		super(selectFunction);
+		this.timeoutFunction = checkNotNull(timeoutFunction);
+		this.timedOutPartialMatchesTag = checkNotNull(timedOutPartialMatchesTag);
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		FunctionUtils.setFunctionRuntimeContext(timeoutFunction, getRuntimeContext());
+		FunctionUtils.openFunction(timeoutFunction, parameters);
+	}
+
+	@Override
+	public void close() throws Exception {
+		super.close();
+		FunctionUtils.closeFunction(timeoutFunction);
+	}
+
+	@Override
+	public void processTimedOutMatch(
+			final Map<String, List<IN>> match,
+			final Context ctx) throws Exception {
+
+		final long resultTimestamp = ctx.timestamp() != null ? ctx.timestamp() : ctx.currentProcessingTime();
+		final T timedOutPatternResult = timeoutFunction.timeout(match, resultTimestamp);
+
+		ctx.output(timedOutPartialMatchesTag, timedOutPatternResult);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 01dcfd97eb4..48bb587a5d4 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -19,6 +19,8 @@
 package org.apache.flink.cep.nfa;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.typeutils.CompatibilityResult;
 import org.apache.flink.api.common.typeutils.CompatibilityUtil;
 import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
@@ -35,8 +37,8 @@
 import org.apache.flink.cep.nfa.sharedbuffer.NodeId;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
-import org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator;
 import org.apache.flink.cep.pattern.conditions.IterativeCondition;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.streaming.api.windowing.time.Time;
@@ -61,7 +63,7 @@
 /**
  * Non-deterministic finite automaton implementation.
  *
- * <p>The {@link AbstractKeyedCEPPatternOperator CEP operator}
+ * <p>The {@link org.apache.flink.cep.operator.CepOperator CEP operator}
  * keeps one NFA per key, for keyed input streams, and a single global NFA for non-keyed ones.
  * When an event gets processed, it updates the NFA's internal state machine.
  *
@@ -169,6 +171,34 @@ private boolean isFinalState(ComputationState state) {
 		return stateObject.isFinal();
 	}
 
+	/**
+	 * Initialization method for the NFA. It is called before any element is passed and thus suitable for one time setup
+	 * work.
+	 * @param cepRuntimeContext runtime context of the enclosing operator
+	 * @param conf The configuration containing the parameters attached to the contract.
+	 */
+	public void open(RuntimeContext cepRuntimeContext, Configuration conf) throws Exception {
+		for (State<T> state : getStates()) {
+			for (StateTransition<T> transition : state.getStateTransitions()) {
+				IterativeCondition condition = transition.getCondition();
+				FunctionUtils.setFunctionRuntimeContext(condition, cepRuntimeContext);
+				FunctionUtils.openFunction(condition, conf);
+			}
+		}
+	}
+
+	/**
+	 * Tear-down method for the NFA.
+	 */
+	public void close() throws Exception {
+		for (State<T> state : getStates()) {
+			for (StateTransition<T> transition : state.getStateTransitions()) {
+				IterativeCondition condition = transition.getCondition();
+				FunctionUtils.closeFunction(condition);
+			}
+		}
+	}
+
 	/**
 	 * Processes the next input event. If some of the computations reach a final state then the
 	 * resulting event sequences are returned. If computations time out and timeout handling is
@@ -225,7 +255,7 @@ private boolean isFinalState(ComputationState state) {
 
 	/**
 	 * Prunes states assuming there will be no events with timestamp <b>lower</b> than the given one.
-	 * It cleares the sharedBuffer and also emits all timed out partial matches.
+	 * It clears the sharedBuffer and also emits all timed out partial matches.
 	 *
 	 * @param sharedBufferAccessor the accessor to SharedBuffer object that we need to work upon while processing
 	 * @param nfaState     The NFAState object that we need to affect while processing
@@ -249,7 +279,7 @@ private boolean isFinalState(ComputationState state) {
 					Map<String, List<T>> timedOutPattern = sharedBufferAccessor.materializeMatch(extractCurrentMatches(
 						sharedBufferAccessor,
 						computationState));
-					timeoutResult.add(Tuple2.of(timedOutPattern, timestamp));
+					timeoutResult.add(Tuple2.of(timedOutPattern, computationState.getStartTimestamp() + windowTime));
 				}
 
 				sharedBufferAccessor.releaseNode(computationState.getPreviousBufferEntry());
@@ -265,7 +295,6 @@ private boolean isFinalState(ComputationState state) {
 		sharedBufferAccessor.advanceTime(timestamp);
 
 		return timeoutResult;
-
 	}
 
 	private boolean isStateTimedOut(final ComputationState state, final long timestamp) {
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
deleted file mode 100644
index 3aca758ebad..00000000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.functions.NullByteKeySelector;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.cep.PatternFlatTimeoutFunction;
-import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.PatternStream;
-import org.apache.flink.cep.PatternTimeoutFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.cep.pattern.Pattern;
-import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.KeyedStream;
-import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.util.OutputTag;
-
-/**
- * Utility methods for creating {@link PatternStream}.
- */
-public class CEPOperatorUtils {
-
-	/**
-	 * Creates a data stream containing results of {@link PatternSelectFunction} to fully matching event patterns.
-	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
-	 * @param selectFunction function to be applied to matching event sequences
-	 * @param outTypeInfo output TypeInformation of selectFunction
-	 * @param <IN> type of input events
-	 * @param <OUT> type of output events
-	 * @return Data stream containing fully matched event sequence with applied {@link PatternSelectFunction}
-	 */
-	public static <IN, OUT> SingleOutputStreamOperator<OUT> createPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final EventComparator<IN> comparator,
-			final PatternSelectFunction<IN, OUT> selectFunction,
-			final TypeInformation<OUT> outTypeInfo,
-			final OutputTag<IN> lateDataOutputTag) {
-		return createPatternStream(inputStream, pattern, outTypeInfo, false, comparator, new OperatorBuilder<IN, OUT>() {
-			@Override
-			public OneInputStreamOperator<IN, OUT> build(
-				TypeSerializer<IN> inputSerializer,
-				boolean isProcessingTime,
-				NFACompiler.NFAFactory<IN> nfaFactory,
-				EventComparator<IN> comparator,
-				AfterMatchSkipStrategy skipStrategy) {
-				return new SelectCepOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					skipStrategy,
-					selectFunction,
-					lateDataOutputTag
-				);
-			}
-
-			@Override
-			public String getKeyedOperatorName() {
-				return "SelectCepOperator";
-			}
-
-			@Override
-			public String getOperatorName() {
-				return "GlobalSelectCepOperator";
-			}
-		});
-	}
-
-	/**
-	 * Creates a data stream containing results of {@link PatternFlatSelectFunction} to fully matching event patterns.
-	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
-	 * @param selectFunction function to be applied to matching event sequences
-	 * @param outTypeInfo output TypeInformation of selectFunction
-	 * @param <IN> type of input events
-	 * @param <OUT> type of output events
-	 * @return Data stream containing fully matched event sequence with applied {@link PatternFlatSelectFunction}
-	 */
-	public static <IN, OUT> SingleOutputStreamOperator<OUT> createPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final EventComparator<IN> comparator,
-			final PatternFlatSelectFunction<IN, OUT> selectFunction,
-			final TypeInformation<OUT> outTypeInfo,
-			final OutputTag<IN> lateDataOutputTag) {
-		return createPatternStream(inputStream, pattern, outTypeInfo, false, comparator, new OperatorBuilder<IN, OUT>() {
-			@Override
-			public OneInputStreamOperator<IN, OUT> build(
-				TypeSerializer<IN> inputSerializer,
-				boolean isProcessingTime,
-				NFACompiler.NFAFactory<IN> nfaFactory,
-				EventComparator<IN> comparator,
-				AfterMatchSkipStrategy skipStrategy) {
-				return new FlatSelectCepOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					skipStrategy,
-					selectFunction,
-					lateDataOutputTag
-				);
-			}
-
-			@Override
-			public String getKeyedOperatorName() {
-				return "FlatSelectCepOperator";
-			}
-
-			@Override
-			public String getOperatorName() {
-				return "GlobalFlatSelectCepOperator";
-			}
-		});
-	}
-
-	/**
-	 * Creates a data stream containing results of {@link PatternFlatSelectFunction} to fully matching event patterns and
-	 * also timed out partially matched with applied {@link PatternFlatTimeoutFunction} as a sideoutput.
-	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
-	 * @param selectFunction function to be applied to matching event sequences
-	 * @param outTypeInfo output TypeInformation of selectFunction
-	 * @param outputTag {@link OutputTag} for a side-output with timed out matches
-	 * @param timeoutFunction function to be applied to timed out event sequences
-	 * @param <IN> type of input events
-	 * @param <OUT1> type of fully matched events
-	 * @param <OUT2> type of timed out events
-	 * @return Data stream containing fully matched event sequence with applied {@link PatternFlatSelectFunction} that
-	 * contains timed out patterns with applied {@link PatternFlatTimeoutFunction} as side-output
-	 */
-	public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final EventComparator<IN> comparator,
-			final PatternFlatSelectFunction<IN, OUT1> selectFunction,
-			final TypeInformation<OUT1> outTypeInfo,
-			final OutputTag<OUT2> outputTag,
-			final PatternFlatTimeoutFunction<IN, OUT2> timeoutFunction,
-			final OutputTag<IN> lateDataOutputTag) {
-		return createPatternStream(inputStream, pattern, outTypeInfo, true, comparator, new OperatorBuilder<IN, OUT1>() {
-			@Override
-			public OneInputStreamOperator<IN, OUT1> build(
-				TypeSerializer<IN> inputSerializer,
-				boolean isProcessingTime,
-				NFACompiler.NFAFactory<IN> nfaFactory,
-				EventComparator<IN> comparator,
-				AfterMatchSkipStrategy skipStrategy) {
-				return new FlatSelectTimeoutCepOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					skipStrategy,
-					selectFunction,
-					timeoutFunction,
-					outputTag,
-					lateDataOutputTag
-				);
-			}
-
-			@Override
-			public String getKeyedOperatorName() {
-				return "FlatSelectTimeoutCepOperator";
-			}
-
-			@Override
-			public String getOperatorName() {
-				return "GlobalFlatSelectTimeoutCepOperator";
-			}
-		});
-	}
-
-	/**
-	 * Creates a data stream containing results of {@link PatternSelectFunction} to fully matching event patterns and
-	 * also timed out partially matched with applied {@link PatternTimeoutFunction} as a sideoutput.
-	 *
-	 * @param inputStream stream of input events
-	 * @param pattern pattern to be search for in the stream
-	 * @param selectFunction function to be applied to matching event sequences
-	 * @param outTypeInfo output TypeInformation of selectFunction
-	 * @param outputTag {@link OutputTag} for a side-output with timed out matches
-	 * @param timeoutFunction function to be applied to timed out event sequences
-	 * @param <IN> type of input events
-	 * @param <OUT1> type of fully matched events
-	 * @param <OUT2> type of timed out events
-	 * @return Data stream containing fully matched event sequence with applied {@link PatternSelectFunction} that
-	 * contains timed out patterns with applied {@link PatternTimeoutFunction} as side-output
-	 */
-	public static <IN, OUT1, OUT2> SingleOutputStreamOperator<OUT1> createTimeoutPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final EventComparator<IN> comparator,
-			final PatternSelectFunction<IN, OUT1> selectFunction,
-			final TypeInformation<OUT1> outTypeInfo,
-			final OutputTag<OUT2> outputTag,
-			final PatternTimeoutFunction<IN, OUT2> timeoutFunction,
-			final OutputTag<IN> lateDataOutputTag) {
-		return createPatternStream(inputStream, pattern, outTypeInfo, true, comparator, new OperatorBuilder<IN, OUT1>() {
-			@Override
-			public OneInputStreamOperator<IN, OUT1> build(
-				TypeSerializer<IN> inputSerializer,
-				boolean isProcessingTime,
-				NFACompiler.NFAFactory<IN> nfaFactory,
-				EventComparator<IN> comparator,
-				AfterMatchSkipStrategy skipStrategy) {
-				return new SelectTimeoutCepOperator<>(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					skipStrategy,
-					selectFunction,
-					timeoutFunction,
-					outputTag,
-					lateDataOutputTag
-				);
-			}
-
-			@Override
-			public String getKeyedOperatorName() {
-				return "SelectTimeoutCepOperator";
-			}
-
-			@Override
-			public String getOperatorName() {
-				return "GlobalSelectTimeoutCepOperator";
-			}
-		});
-	}
-
-	private static <IN, OUT, K> SingleOutputStreamOperator<OUT> createPatternStream(
-			final DataStream<IN> inputStream,
-			final Pattern<IN, ?> pattern,
-			final TypeInformation<OUT> outTypeInfo,
-			final boolean timeoutHandling,
-			final EventComparator<IN> comparator,
-			final OperatorBuilder<IN, OUT> operatorBuilder) {
-		final TypeSerializer<IN> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig());
-
-		// check whether we use processing time
-		final boolean isProcessingTime = inputStream.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
-
-		// compile our pattern into a NFAFactory to instantiate NFAs later on
-		final NFACompiler.NFAFactory<IN> nfaFactory = NFACompiler.compileFactory(pattern, timeoutHandling);
-
-		final SingleOutputStreamOperator<OUT> patternStream;
-
-		if (inputStream instanceof KeyedStream) {
-			KeyedStream<IN, K> keyedStream = (KeyedStream<IN, K>) inputStream;
-
-			patternStream = keyedStream.transform(
-				operatorBuilder.getKeyedOperatorName(),
-				outTypeInfo,
-				operatorBuilder.build(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					pattern.getAfterMatchSkipStrategy()));
-		} else {
-			KeySelector<IN, Byte> keySelector = new NullByteKeySelector<>();
-
-			patternStream = inputStream.keyBy(keySelector).transform(
-				operatorBuilder.getOperatorName(),
-				outTypeInfo,
-				operatorBuilder.build(
-					inputSerializer,
-					isProcessingTime,
-					nfaFactory,
-					comparator,
-					pattern.getAfterMatchSkipStrategy()
-				)).forceNonParallel();
-		}
-
-		return patternStream;
-	}
-
-	private interface OperatorBuilder<IN, OUT> {
-			OneInputStreamOperator<IN, OUT> build(
-			TypeSerializer<IN> inputSerializer,
-			boolean isProcessingTime,
-			NFACompiler.NFAFactory<IN> nfaFactory,
-			EventComparator<IN> comparator,
-			AfterMatchSkipStrategy skipStrategy);
-
-		String getKeyedOperatorName();
-
-		String getOperatorName();
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
similarity index 74%
rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
rename to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
index c603741cea5..6df1fa020ee 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepOperator.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.cep.operator;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.util.FunctionUtils;
 import org.apache.flink.api.common.state.MapState;
 import org.apache.flink.api.common.state.MapStateDescriptor;
@@ -30,31 +30,36 @@
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.NFA.MigratedNFA;
 import org.apache.flink.cep.nfa.NFAState;
 import org.apache.flink.cep.nfa.NFAStateSerializer;
-import org.apache.flink.cep.nfa.State;
-import org.apache.flink.cep.nfa.StateTransition;
 import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBuffer;
 import org.apache.flink.cep.nfa.sharedbuffer.SharedBufferAccessor;
-import org.apache.flink.cep.pattern.conditions.IterativeCondition;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.state.KeyedStateFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -64,18 +69,17 @@
 import java.util.stream.Stream;
 
 /**
- * Abstract CEP pattern operator for a keyed input stream. For each key, the operator creates
+ * CEP pattern operator for a keyed input stream. For each key, the operator creates
  * a {@link NFA} and a priority queue to buffer out of order elements. Both data structures are
- * stored using the managed keyed state. Additionally, the set of all seen keys is kept as part of the
- * operator state. This is necessary to trigger the execution for all keys upon receiving a new
- * watermark.
+ * stored using the managed keyed state.
  *
  * @param <IN> Type of the input elements
  * @param <KEY> Type of the key on which the input stream is keyed
  * @param <OUT> Type of the output elements
  */
-public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT, F extends Function>
-		extends AbstractUdfStreamOperator<OUT, F>
+@Internal
+public class CepOperator<IN, KEY, OUT>
+		extends AbstractUdfStreamOperator<OUT, PatternProcessFunction<IN, OUT>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable<KEY, VoidNamespace> {
 
 	private static final long serialVersionUID = -4166778210774160757L;
@@ -105,29 +109,41 @@
 	 */
 	private long lastWatermark;
 
+	/** Comparator for secondary sorting. Primary sorting is always done on time. */
 	private final EventComparator<IN> comparator;
 
 	/**
 	 * {@link OutputTag} to use for late arriving events. Elements with timestamp smaller than
 	 * the current watermark will be emitted to this.
 	 */
-	protected final OutputTag<IN> lateDataOutputTag;
+	private final OutputTag<IN> lateDataOutputTag;
+
+	/** Strategy which element to skip after a match was found. */
+	private final AfterMatchSkipStrategy afterMatchSkipStrategy;
+
+	/** Context passed to user function. */
+	private transient ContextFunctionImpl context;
+
+	/** Main output collector, that sets a proper timestamp to the StreamRecord. */
+	private transient TimestampedCollector<OUT> collector;
 
-	protected final AfterMatchSkipStrategy afterMatchSkipStrategy;
+	/** Wrapped RuntimeContext that limits the underlying context features. */
+	private transient CepRuntimeContext cepRuntimeContext;
 
-	public AbstractKeyedCEPPatternOperator(
+	public CepOperator(
 			final TypeSerializer<IN> inputSerializer,
 			final boolean isProcessingTime,
 			final NFACompiler.NFAFactory<IN> nfaFactory,
-			final EventComparator<IN> comparator,
-			final AfterMatchSkipStrategy afterMatchSkipStrategy,
-			final F function,
-			final OutputTag<IN> lateDataOutputTag) {
+			@Nullable final EventComparator<IN> comparator,
+			@Nullable final AfterMatchSkipStrategy afterMatchSkipStrategy,
+			final PatternProcessFunction<IN, OUT> function,
+			@Nullable final OutputTag<IN> lateDataOutputTag) {
 		super(function);
 
 		this.inputSerializer = Preconditions.checkNotNull(inputSerializer);
-		this.isProcessingTime = Preconditions.checkNotNull(isProcessingTime);
 		this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
+
+		this.isProcessingTime = isProcessingTime;
 		this.comparator = comparator;
 		this.lateDataOutputTag = lateDataOutputTag;
 
@@ -138,6 +154,13 @@ public AbstractKeyedCEPPatternOperator(
 		}
 	}
 
+	@Override
+	public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
+		super.setup(containingTask, config, output);
+		this.cepRuntimeContext = new CepRuntimeContext(getRuntimeContext());
+		FunctionUtils.setFunctionRuntimeContext(getUserFunction(), this.cepRuntimeContext);
+	}
+
 	@Override
 	public void initializeState(StateInitializationContext context) throws Exception {
 		super.initializeState(context);
@@ -183,22 +206,22 @@ public void process(Object key, ValueState<MigratedNFA<IN>> state) throws Except
 	@Override
 	public void open() throws Exception {
 		super.open();
-
 		timerService = getInternalTimerService(
 				"watermark-callbacks",
 				VoidNamespaceSerializer.INSTANCE,
 				this);
 
-		this.nfa = nfaFactory.createNFA();
+		nfa = nfaFactory.createNFA();
+		nfa.open(cepRuntimeContext, new Configuration());
 
-		openNFA(nfa);
+		context = new ContextFunctionImpl();
+		collector = new TimestampedCollector<>(output);
 	}
 
 	@Override
 	public void close() throws Exception {
 		super.close();
-
-		closeNFA(nfa);
+		nfa.close();
 	}
 
 	@Override
@@ -404,54 +427,102 @@ private void processEvent(NFAState nfaState, IN event, long timestamp) throws Ex
 	private void advanceTime(NFAState nfaState, long timestamp) throws Exception {
 		try (SharedBufferAccessor<IN> sharedBufferAccessor = partialMatches.getAccessor()) {
 			Collection<Tuple2<Map<String, List<IN>>, Long>> timedOut =
-				nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
-			processTimedOutSequences(timedOut, timestamp);
+					nfa.advanceTime(sharedBufferAccessor, nfaState, timestamp);
+			if (!timedOut.isEmpty()) {
+				processTimedOutSequences(timedOut);
+			}
 		}
 	}
 
-	private void openNFA(NFA<IN> nfa) throws Exception {
-		Configuration conf = new Configuration();
-		for (State<IN> state : nfa.getStates()) {
-			for (StateTransition<IN> transition : state.getStateTransitions()) {
-				IterativeCondition condition = transition.getCondition();
-				FunctionUtils.setFunctionRuntimeContext(condition, getRuntimeContext());
-				FunctionUtils.openFunction(condition, conf);
-			}
+	private void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
+		PatternProcessFunction<IN, OUT> function = getUserFunction();
+		setTimestamp(timestamp);
+		for (Map<String, List<IN>> matchingSequence : matchingSequences) {
+			function.processMatch(matchingSequence, context, collector);
 		}
 	}
 
-	private void closeNFA(NFA<IN> nfa) throws Exception {
-		for (State<IN> state : nfa.getStates()) {
-			for (StateTransition<IN> transition : state.getStateTransitions()) {
-				IterativeCondition condition = transition.getCondition();
-				FunctionUtils.closeFunction(condition);
+	private void processTimedOutSequences(Collection<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences) throws Exception {
+		PatternProcessFunction<IN, OUT> function = getUserFunction();
+		if (function instanceof TimedOutPartialMatchHandler) {
+
+			@SuppressWarnings("unchecked")
+			TimedOutPartialMatchHandler<IN> timeoutHandler = (TimedOutPartialMatchHandler<IN>) function;
+
+			for (Tuple2<Map<String, List<IN>>, Long> matchingSequence : timedOutSequences) {
+				setTimestamp(matchingSequence.f1);
+				timeoutHandler.processTimedOutMatch(matchingSequence.f0, context);
 			}
 		}
 	}
 
-	protected abstract void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception;
+	private void setTimestamp(long timestamp) {
+		if (!isProcessingTime) {
+			collector.setAbsoluteTimestamp(timestamp);
+			context.setTimestamp(timestamp);
+		}
+	}
 
-	protected void processTimedOutSequences(
-			Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences,
-			long timestamp) throws Exception {
+	/**
+	 * Implementation of {@link PatternProcessFunction.Context}. Design to be instantiated once per operator.
+	 * It serves three methods:
+	 *  <ul>
+	 *      <li>gives access to currentProcessingTime through {@link InternalTimerService}</li>
+	 *      <li>gives access to timestamp of current record (or null if Processing time)</li>
+	 *      <li>enables side outputs with proper timestamp of StreamRecord handling based on either Processing or
+	 *          Event time</li>
+	 *  </ul>
+	 */
+	private class ContextFunctionImpl implements PatternProcessFunction.Context {
+
+		private Long timestamp;
+
+		@Override
+		public <X> void output(final OutputTag<X> outputTag, final X value) {
+			final StreamRecord<X> record;
+			if (isProcessingTime) {
+				record = new StreamRecord<>(value);
+			} else {
+				record = new StreamRecord<>(value, timestamp());
+			}
+			output.collect(outputTag, record);
+		}
+
+		void setTimestamp(long timestamp) {
+			this.timestamp = timestamp;
+		}
+
+		@Override
+		public Long timestamp() {
+			if (isProcessingTime) {
+				return null;
+			} else {
+				return timestamp;
+			}
+		}
+
+		@Override
+		public long currentProcessingTime() {
+			return timerService.currentProcessingTime();
+		}
 	}
 
 	//////////////////////			Testing Methods			//////////////////////
 
 	@VisibleForTesting
-	public boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
+	boolean hasNonEmptySharedBuffer(KEY key) throws Exception {
 		setCurrentKey(key);
 		return !partialMatches.isEmpty();
 	}
 
 	@VisibleForTesting
-	public boolean hasNonEmptyPQ(KEY key) throws Exception {
+	boolean hasNonEmptyPQ(KEY key) throws Exception {
 		setCurrentKey(key);
 		return elementQueueState.keys().iterator().hasNext();
 	}
 
 	@VisibleForTesting
-	public int getPQSize(KEY key) throws Exception {
+	int getPQSize(KEY key) throws Exception {
 		setCurrentKey(key);
 		int counter = 0;
 		for (List<IN> elements: elementQueueState.values()) {
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
similarity index 75%
rename from flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
rename to flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
index ed93f8e52cd..0518dadecb8 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/CepRuntimeContext.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CepRuntimeContext.java
@@ -16,8 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep;
+package org.apache.flink.cep.operator;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
@@ -45,17 +46,23 @@
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
- * A wrapper class for pattern select function and iterative condition function's {@link RuntimeContext}.
- * The runtime context only supports basic operations. Consequently, state access, accumulators,
- * broadcast variables and the distributed cache are disabled.
+ * A wrapper class for the {@link RuntimeContext}.
+ *
+ * <p>This context only exposes the functionality needed by the
+ * pattern process function and iterative condition function.
+ * Consequently, state access, accumulators, broadcast variables
+ * and the distributed cache are disabled.
  */
-public class CepRuntimeContext implements RuntimeContext {
+@Internal
+class CepRuntimeContext implements RuntimeContext {
 
 	private final RuntimeContext runtimeContext;
 
-	public CepRuntimeContext(RuntimeContext runtimeContext) {
-		this.runtimeContext = runtimeContext;
+	CepRuntimeContext(final RuntimeContext runtimeContext) {
+		this.runtimeContext = checkNotNull(runtimeContext);
 	}
 
 	@Override
@@ -114,12 +121,13 @@ public DistributedCache getDistributedCache() {
 
 	@Override
 	public <V, A extends Serializable> void addAccumulator(
-		String name, Accumulator<V, A> accumulator) {
+			final String name,
+			final Accumulator<V, A> accumulator) {
 		throw new UnsupportedOperationException("Accumulators are not supported.");
 	}
 
 	@Override
-	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(String name) {
+	public <V, A extends Serializable> Accumulator<V, A> getAccumulator(final String name) {
 		throw new UnsupportedOperationException("Accumulators are not supported.");
 	}
 
@@ -129,68 +137,70 @@ public DistributedCache getDistributedCache() {
 	}
 
 	@Override
-	public IntCounter getIntCounter(String name) {
+	public IntCounter getIntCounter(final String name) {
 		throw new UnsupportedOperationException("Int counters are not supported.");
 	}
 
 	@Override
-	public LongCounter getLongCounter(String name) {
+	public LongCounter getLongCounter(final String name) {
 		throw new UnsupportedOperationException("Long counters are not supported.");
 	}
 
 	@Override
-	public DoubleCounter getDoubleCounter(String name) {
+	public DoubleCounter getDoubleCounter(final String name) {
 		throw new UnsupportedOperationException("Double counters are not supported.");
 	}
 
 	@Override
-	public Histogram getHistogram(String name) {
+	public Histogram getHistogram(final String name) {
 		throw new UnsupportedOperationException("Histograms are not supported.");
 	}
 
 	@Override
-	public boolean hasBroadcastVariable(String name) {
+	public boolean hasBroadcastVariable(final String name) {
 		throw new UnsupportedOperationException("Broadcast variables are not supported.");
 	}
 
 	@Override
-	public <RT> List<RT> getBroadcastVariable(String name) {
+	public <RT> List<RT> getBroadcastVariable(final String name) {
 		throw new UnsupportedOperationException("Broadcast variables are not supported.");
 	}
 
 	@Override
 	public <T, C> C getBroadcastVariableWithInitializer(
-		String name, BroadcastVariableInitializer<T, C> initializer) {
+			final String name,
+			final BroadcastVariableInitializer<T, C> initializer) {
 		throw new UnsupportedOperationException("Broadcast variables are not supported.");
 	}
 
 	@Override
-	public <T> ValueState<T> getState(ValueStateDescriptor<T> stateProperties) {
+	public <T> ValueState<T> getState(final ValueStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <T> ListState<T> getListState(ListStateDescriptor<T> stateProperties) {
+	public <T> ListState<T> getListState(final ListStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <T> ReducingState<T> getReducingState(ReducingStateDescriptor<T> stateProperties) {
+	public <T> ReducingState<T> getReducingState(final ReducingStateDescriptor<T> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
+	public <IN, ACC, OUT> AggregatingState<IN, OUT> getAggregatingState(
+			final AggregatingStateDescriptor<IN, ACC, OUT> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <T, ACC> FoldingState<T, ACC> getFoldingState(FoldingStateDescriptor<T, ACC> stateProperties) {
+	public <T, ACC> FoldingState<T, ACC> getFoldingState(final FoldingStateDescriptor<T, ACC> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 
 	@Override
-	public <UK, UV> MapState<UK, UV> getMapState(MapStateDescriptor<UK, UV> stateProperties) {
+	public <UK, UV> MapState<UK, UV> getMapState(final MapStateDescriptor<UK, UV> stateProperties) {
 		throw new UnsupportedOperationException("State is not supported.");
 	}
 }
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
deleted file mode 100644
index df54b53f999..00000000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectCepOperator.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.util.OutputTag;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternFlatSelectFunction} to fully matched event patterns.
- *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
- * @param <OUT> Type of the output elements
- */
-public class FlatSelectCepOperator<IN, KEY, OUT>
-	extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT, PatternFlatSelectFunction<IN, OUT>> {
-	private static final long serialVersionUID = 5845993459551561518L;
-
-	public FlatSelectCepOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory,
-		EventComparator<IN> comparator,
-		AfterMatchSkipStrategy skipStrategy,
-		PatternFlatSelectFunction<IN, OUT> function,
-		OutputTag<IN> lateDataOutputTag) {
-		super(inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, function, lateDataOutputTag);
-	}
-
-	private transient TimestampedCollector<OUT> collector;
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-	}
-
-	@Override
-	protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
-		for (Map<String, List<IN>> match : matchingSequences) {
-			collector.setAbsoluteTimestamp(timestamp);
-			getUserFunction().flatSelect(match, collector);
-		}
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
deleted file mode 100644
index 642c92aaffd..00000000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/FlatSelectTimeoutCepOperator.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternFlatSelectFunction;
-import org.apache.flink.cep.PatternFlatTimeoutFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.util.OutputTag;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternFlatSelectFunction} to fully
- * matched event patterns and {@link PatternFlatTimeoutFunction} to timed out ones. The timed out elements are returned
- * as a side-output.
- *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
- * @param <OUT1> Type of the output elements
- * @param <OUT2> Type of the timed out output elements
- */
-public class FlatSelectTimeoutCepOperator<IN, OUT1, OUT2, KEY> extends
-	AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, FlatSelectTimeoutCepOperator.FlatSelectWrapper<IN, OUT1, OUT2>> {
-
-	private transient TimestampedCollector<OUT1> collector;
-
-	private transient TimestampedSideOutputCollector<OUT2> sideOutputCollector;
-
-	private OutputTag<OUT2> timedOutOutputTag;
-
-	public FlatSelectTimeoutCepOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory,
-		EventComparator<IN> comparator,
-		AfterMatchSkipStrategy skipStrategy,
-		PatternFlatSelectFunction<IN, OUT1> flatSelectFunction,
-		PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction,
-		OutputTag<OUT2> outputTag,
-		OutputTag<IN> lateDataOutputTag) {
-		super(
-			inputSerializer,
-			isProcessingTime,
-			nfaFactory,
-			comparator,
-			skipStrategy,
-			new FlatSelectWrapper<>(flatSelectFunction, flatTimeoutFunction),
-			lateDataOutputTag);
-		this.timedOutOutputTag = outputTag;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-		sideOutputCollector = new TimestampedSideOutputCollector<>(timedOutOutputTag, output);
-	}
-
-	@Override
-	protected void processMatchedSequences(
-		Iterable<Map<String, List<IN>>> matchingSequences,
-		long timestamp) throws Exception {
-		for (Map<String, List<IN>> match : matchingSequences) {
-			getUserFunction().getFlatSelectFunction().flatSelect(match, collector);
-		}
-	}
-
-	@Override
-	protected void processTimedOutSequences(
-		Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception {
-		for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) {
-			sideOutputCollector.setAbsoluteTimestamp(timestamp);
-			getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1, sideOutputCollector);
-		}
-	}
-
-	/**
-	 * Wrapper that enables storing {@link PatternFlatSelectFunction} and {@link PatternFlatTimeoutFunction} functions
-	 * in one udf.
-	 */
-	@Internal
-	public static class FlatSelectWrapper<IN, OUT1, OUT2> implements Function {
-
-		private static final long serialVersionUID = -8320546120157150202L;
-
-		private PatternFlatSelectFunction<IN, OUT1> flatSelectFunction;
-		private PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction;
-
-		@VisibleForTesting
-		public PatternFlatSelectFunction<IN, OUT1> getFlatSelectFunction() {
-			return flatSelectFunction;
-		}
-
-		@VisibleForTesting
-		public PatternFlatTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() {
-			return flatTimeoutFunction;
-		}
-
-		public FlatSelectWrapper(
-			PatternFlatSelectFunction<IN, OUT1> flatSelectFunction,
-			PatternFlatTimeoutFunction<IN, OUT2> flatTimeoutFunction) {
-			this.flatSelectFunction = flatSelectFunction;
-			this.flatTimeoutFunction = flatTimeoutFunction;
-		}
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
deleted file mode 100644
index ad335e52b21..00000000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectCepOperator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternSelectFunction} to fully matched event patterns.
- *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
- * @param <OUT> Type of the output elements
- */
-public class SelectCepOperator<IN, KEY, OUT>
-	extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT, PatternSelectFunction<IN, OUT>> {
-	public SelectCepOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory,
-		EventComparator<IN> comparator,
-		AfterMatchSkipStrategy skipStrategy,
-		PatternSelectFunction<IN, OUT> function,
-		OutputTag<IN> lateDataOutputTag) {
-		super(inputSerializer, isProcessingTime, nfaFactory, comparator, skipStrategy, function, lateDataOutputTag);
-	}
-
-	@Override
-	protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
-		for (Map<String, List<IN>> match : matchingSequences) {
-			output.collect(new StreamRecord<>(getUserFunction().select(match), timestamp));
-		}
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
deleted file mode 100644
index 73ac709f850..00000000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/SelectTimeoutCepOperator.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.PatternTimeoutFunction;
-import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
-import org.apache.flink.cep.nfa.compiler.NFACompiler;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.OutputTag;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- * Version of {@link AbstractKeyedCEPPatternOperator} that applies given {@link PatternSelectFunction} to fully
- * matched event patterns and {@link PatternTimeoutFunction} to timed out ones. The timed out elements are returned
- * as a side-output.
- *
- * @param <IN> Type of the input elements
- * @param <KEY> Type of the key on which the input stream is keyed
- * @param <OUT1> Type of the output elements
- * @param <OUT2> Type of the timed out output elements
- */
-public class SelectTimeoutCepOperator<IN, OUT1, OUT2, KEY>
-	extends AbstractKeyedCEPPatternOperator<IN, KEY, OUT1, SelectTimeoutCepOperator.SelectWrapper<IN, OUT1, OUT2>> {
-
-	private OutputTag<OUT2> timedOutOutputTag;
-
-	public SelectTimeoutCepOperator(
-		TypeSerializer<IN> inputSerializer,
-		boolean isProcessingTime,
-		NFACompiler.NFAFactory<IN> nfaFactory,
-		final EventComparator<IN> comparator,
-		AfterMatchSkipStrategy skipStrategy,
-		PatternSelectFunction<IN, OUT1> flatSelectFunction,
-		PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction,
-		OutputTag<OUT2> outputTag,
-		OutputTag<IN> lateDataOutputTag) {
-		super(
-			inputSerializer,
-			isProcessingTime,
-			nfaFactory,
-			comparator,
-			skipStrategy,
-			new SelectWrapper<>(flatSelectFunction, flatTimeoutFunction),
-			lateDataOutputTag);
-		this.timedOutOutputTag = outputTag;
-	}
-
-	@Override
-	protected void processMatchedSequences(Iterable<Map<String, List<IN>>> matchingSequences, long timestamp) throws Exception {
-		for (Map<String, List<IN>> match : matchingSequences) {
-			output.collect(new StreamRecord<>(getUserFunction().getFlatSelectFunction().select(match), timestamp));
-		}
-	}
-
-	@Override
-	protected void processTimedOutSequences(
-		Iterable<Tuple2<Map<String, List<IN>>, Long>> timedOutSequences, long timestamp) throws Exception {
-		for (Tuple2<Map<String, List<IN>>, Long> match : timedOutSequences) {
-			output.collect(timedOutOutputTag,
-				new StreamRecord<>(
-					getUserFunction().getFlatTimeoutFunction().timeout(match.f0, match.f1),
-					timestamp));
-		}
-	}
-
-	/**
-	 * Wrapper that enables storing {@link PatternSelectFunction} and {@link PatternTimeoutFunction} in one udf.
-	 *
-	 * @param <IN> Type of the input elements
-	 * @param <OUT1> Type of the output elements
-	 * @param <OUT2> Type of the timed out output elements
-	 */
-	@Internal
-	public static class SelectWrapper<IN, OUT1, OUT2> implements Function {
-
-		private static final long serialVersionUID = -8320546120157150202L;
-
-		private PatternSelectFunction<IN, OUT1> flatSelectFunction;
-		private PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction;
-
-		PatternSelectFunction<IN, OUT1> getFlatSelectFunction() {
-			return flatSelectFunction;
-		}
-
-		PatternTimeoutFunction<IN, OUT2> getFlatTimeoutFunction() {
-			return flatTimeoutFunction;
-		}
-
-		public SelectWrapper(
-			PatternSelectFunction<IN, OUT1> flatSelectFunction,
-			PatternTimeoutFunction<IN, OUT2> flatTimeoutFunction) {
-			this.flatSelectFunction = flatSelectFunction;
-			this.flatTimeoutFunction = flatTimeoutFunction;
-		}
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
deleted file mode 100644
index 53365439012..00000000000
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimestampedSideOutputCollector.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cep.operator;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.OutputTag;
-
-/**
- * Wrapper around an {@link Output} for user functions that expect a {@link Collector}.
- * Before giving the {@link TimestampedSideOutputCollector} to a user function you must set
- * the timestamp that should be attached to emitted elements. Most operators
- * would set the timestamp of the incoming
- * {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
- *
- * <p>This version emits results into a SideOutput specified by given {@link OutputTag}
- *
- * @param <T> The type of the elements that can be emitted.
- */
-@Internal
-public class TimestampedSideOutputCollector<T> implements Collector<T> {
-
-	private final Output<?> output;
-
-	private final StreamRecord<T> reuse;
-
-	private final OutputTag<T> outputTag;
-
-	/**
-	 * Creates a new {@link TimestampedSideOutputCollector} that wraps the given {@link Output} and collects
-	 * results into sideoutput corresponding to {@link OutputTag}.
-	 */
-	public TimestampedSideOutputCollector(OutputTag<T> outputTag, Output<?> output) {
-		this.output = output;
-		this.outputTag = outputTag;
-		this.reuse = new StreamRecord<T>(null);
-	}
-
-	@Override
-	public void collect(T record) {
-		output.collect(outputTag, reuse.replace(record));
-	}
-
-	public void setTimestamp(StreamRecord<?> timestampBase) {
-		if (timestampBase.hasTimestamp()) {
-			reuse.setTimestamp(timestampBase.getTimestamp());
-		} else {
-			reuse.eraseTimestamp();
-		}
-	}
-
-	public void setAbsoluteTimestamp(long timestamp) {
-		reuse.setTimestamp(timestamp);
-	}
-
-	public void eraseTimestamp() {
-		reuse.eraseTimestamp();
-	}
-
-	@Override
-	public void close() {
-		output.close();
-	}
-}
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
index 12f017e9946..da20321eccc 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/RichIterativeCondition.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.cep.CepRuntimeContext;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.Preconditions;
 
@@ -46,12 +45,7 @@
 	@Override
 	public void setRuntimeContext(RuntimeContext runtimeContext) {
 		Preconditions.checkNotNull(runtimeContext);
-
-		if (runtimeContext instanceof CepRuntimeContext) {
-			this.runtimeContext = runtimeContext;
-		} else {
-			this.runtimeContext = new CepRuntimeContext(runtimeContext);
-		}
+		this.runtimeContext = runtimeContext;
 	}
 
 	@Override
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
index 6ffd5d2de62..2391d547fcb 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/nfa/NFAITCase.java
@@ -378,9 +378,9 @@ public void testSimplePatternWithTimeoutHandling() throws Exception {
 		timeoutPattern4.put("start", Collections.singletonList(new Event(2, "start", 1.0)));
 
 		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern1, 11L));
-		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 13L));
+		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern2, 12L));
 		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern3, 11L));
-		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 13L));
+		expectedTimeoutPatterns.add(Tuple2.of(timeoutPattern4, 12L));
 
 		Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
 			private static final long serialVersionUID = 7907391379273505897L;
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 3f249a8ab5e..2e4b41d6a1c 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -26,9 +26,9 @@
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
 import org.apache.flink.cep.Event;
-import org.apache.flink.cep.PatternSelectFunction;
-import org.apache.flink.cep.PatternTimeoutFunction;
 import org.apache.flink.cep.SubEvent;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.cep.pattern.Pattern;
@@ -43,6 +43,7 @@
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.TestLogger;
 
@@ -69,7 +70,7 @@
 import static org.mockito.Mockito.validateMockitoUsage;
 
 /**
- * Tests for {@link AbstractKeyedCEPPatternOperator}.
+ * Tests for {@link CepOperator}.
  */
 public class CEPOperatorTest extends TestLogger {
 
@@ -254,32 +255,14 @@ public void testKeyedAdvancingTimeWithoutElements() throws Exception {
 			new OutputTag<Tuple2<Map<String, List<Event>>, Long>>("timedOut") {};
 		final KeyedOneInputStreamOperatorTestHarness<Integer, Event, Map<String, List<Event>>> harness =
 			new KeyedOneInputStreamOperatorTestHarness<>(
-				new SelectTimeoutCepOperator<>(
+				new CepOperator<>(
 					Event.createTypeSerializer(),
 					false,
 					new NFAFactory(true),
 					null,
 					null,
-					new PatternSelectFunction<Event, Map<String, List<Event>>>() {
-						private static final long serialVersionUID = -5768297287711394420L;
-
-						@Override
-						public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception {
-							return pattern;
-						}
-					},
-					new PatternTimeoutFunction<Event, Tuple2<Map<String, List<Event>>, Long>>() {
-						private static final long serialVersionUID = 2843329425823093249L;
-
-						@Override
-						public Tuple2<Map<String, List<Event>>, Long> timeout(
-							Map<String, List<Event>> pattern,
-							long timeoutTimestamp) throws Exception {
-							return Tuple2.of(pattern, timeoutTimestamp);
-						}
-					},
-					timedOut
-				, null), new KeySelector<Event, Integer>() {
+					new TimedOutProcessFunction(timedOut),
+					null), new KeySelector<Event, Integer>() {
 				private static final long serialVersionUID = 7219185117566268366L;
 
 				@Override
@@ -334,7 +317,7 @@ public Integer getKey(Event value) throws Exception {
 	@Test
 	public void testKeyedCEPOperatorNFAUpdate() throws Exception {
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			true,
 			new SimpleNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
@@ -393,7 +376,7 @@ public void testKeyedCEPOperatorNFAUpdateWithRocksDB() throws Exception {
 		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			true,
 			new SimpleNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
@@ -455,7 +438,7 @@ public void testKeyedCEPOperatorNFAUpdateWithRocksDB() throws Exception {
 
 	@Test
 	public void testKeyedCEPOperatorNFAUpdateTimes() throws Exception {
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			true,
 			new SimpleNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
@@ -497,7 +480,7 @@ public void testKeyedCEPOperatorNFAUpdateTimesWithRocksDB() throws Exception {
 		RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(new MemoryStateBackend());
 		rocksDBStateBackend.setDbStoragePath(rocksDbPath);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			true,
 			new SimpleNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(
@@ -548,7 +531,7 @@ public void testCEPOperatorCleanupEventTime() throws Exception {
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(false);
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(false);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
 		try {
@@ -593,7 +576,7 @@ public void testCEPOperatorCleanupEventTime() throws Exception {
 			OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 			harness.close();
 
-			SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(false);
+			CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(false);
 			harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
 			harness.setup();
 			harness.initializeState(snapshot);
@@ -646,7 +629,7 @@ public void testCEPOperatorCleanupEventTimeWithSameElements() throws Exception {
 		Event middle1Event3 = new Event(41, "a", 4.0);
 		Event middle2Event1 = new Event(41, "b", 5.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			false,
 			new ComplexNFAFactory());
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
@@ -738,7 +721,7 @@ public void testCEPOperatorSideOutputLateElementsEventTime() throws Exception {
 
 		OutputTag<Event> lateDataTag = new OutputTag<Event>("late-data", TypeInformation.of(Event.class));
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			false,
 			new ComplexNFAFactory(),
 			null,
@@ -784,7 +767,7 @@ public void testCEPOperatorCleanupProcessingTime() throws Exception {
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(true);
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperator(true);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
 		try {
@@ -812,7 +795,7 @@ public void testCEPOperatorCleanupProcessingTime() throws Exception {
 			OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 			harness.close();
 
-			SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(true);
+			CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperator(true);
 			harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
 			harness.setup();
 			harness.initializeState(snapshot);
@@ -904,7 +887,7 @@ public boolean filter(Event value) throws Exception {
 			}
 		});
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = CepOperatorTestUtilities.getKeyedCepOpearator(
 			false,
 			new NFACompiler.NFAFactory<Event>() {
 				private static final long serialVersionUID = 477082663248051994L;
@@ -975,7 +958,7 @@ public void testCEPOperatorComparatorProcessTime() throws Exception {
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(true);
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(true);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
 		try {
@@ -1003,7 +986,7 @@ public void testCEPOperatorComparatorProcessTime() throws Exception {
 			OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 			harness.close();
 
-			SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(true);
+			CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(true);
 			harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
 			harness.setup();
 			harness.initializeState(snapshot);
@@ -1032,7 +1015,7 @@ public void testCEPOperatorComparatorEventTime() throws Exception {
 
 		Event startEventK2 = new Event(43, "start", 1.0);
 
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false);
+		CepOperator<Event, Integer, Map<String, List<Event>>> operator = getKeyedCepOperatorWithComparator(false);
 		OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = CepOperatorTestUtilities.getCepTestHarness(operator);
 
 		try {
@@ -1064,7 +1047,7 @@ public void testCEPOperatorComparatorEventTime() throws Exception {
 			OperatorSubtaskState snapshot = harness.snapshot(0L, 0L);
 			harness.close();
 
-			SelectCepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(false);
+			CepOperator<Event, Integer, Map<String, List<Event>>> operator2 = getKeyedCepOperatorWithComparator(false);
 			harness = CepOperatorTestUtilities.getCepTestHarness(operator2);
 			harness.setup();
 			harness.initializeState(snapshot);
@@ -1101,12 +1084,12 @@ private void verifyPattern(Object outputObject, Event start, SubEvent middle, Ev
 		assertEquals(end, patternMap.get("end").get(0));
 	}
 
-	private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperator(
+	private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperator(
 		boolean isProcessingTime) {
 		return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new NFAFactory());
 	}
 
-	private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperatorWithComparator(
+	private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOperatorWithComparator(
 		boolean isProcessingTime) {
 
 		return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new NFAFactory(), new org.apache.flink.cep.EventComparator<Event>() {
@@ -1176,7 +1159,7 @@ public int compare(Event o1, Event o2) {
 		return CepOperatorTestUtilities.getCepTestHarness(getKeyedCepOpearator(isProcessingTime));
 	}
 
-	private SelectCepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator(boolean isProcessingTime) {
+	private CepOperator<Event, Integer, Map<String, List<Event>>> getKeyedCepOpearator(boolean isProcessingTime) {
 		return CepOperatorTestUtilities.getKeyedCepOpearator(isProcessingTime, new CEPOperatorTest.NFAFactory());
 	}
 
@@ -1323,4 +1306,29 @@ public boolean filter(Event value) throws Exception {
 			return NFACompiler.compileFactory(pattern, handleTimeout).createNFA();
 		}
 	}
+
+	private static class TimedOutProcessFunction extends PatternProcessFunction<Event, Map<String, List<Event>>>
+		implements TimedOutPartialMatchHandler<Event> {
+
+		private final OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOutTag;
+
+		private TimedOutProcessFunction(OutputTag<Tuple2<Map<String, List<Event>>, Long>> timedOutTag) {
+			this.timedOutTag = timedOutTag;
+		}
+
+		@Override
+		public void processMatch(
+			Map<String, List<Event>> match,
+			PatternProcessFunction.Context ctx,
+			Collector<Map<String, List<Event>>> out) throws Exception {
+			out.collect(match);
+		}
+
+		@Override
+		public void processTimedOutMatch(
+			Map<String, List<Event>> match,
+			PatternProcessFunction.Context ctx) throws Exception {
+			ctx.output(timedOutTag, Tuple2.of(match, ctx.timestamp()));
+		}
+	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
index abc4b186a3b..2b4bba141ab 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepOperatorTestUtilities.java
@@ -22,17 +22,18 @@
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.cep.Event;
 import org.apache.flink.cep.EventComparator;
-import org.apache.flink.cep.PatternSelectFunction;
+import org.apache.flink.cep.functions.PatternProcessFunction;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
 import org.apache.flink.util.OutputTag;
 
 import java.util.List;
 import java.util.Map;
 
 /**
- * Utility methods for creating test {@link AbstractKeyedCEPPatternOperator}.
+ * Utility methods for creating test {@link CepOperator}.
  */
 public class CepOperatorTestUtilities {
 
@@ -46,8 +47,8 @@ public Integer getKey(Event value) throws Exception {
 		}
 	}
 
-	public static OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> getCepTestHarness(
-		SelectCepOperator<Event, Integer, Map<String, List<Event>>> cepOperator) throws Exception {
+	public static <T> OneInputStreamOperatorTestHarness<Event, T> getCepTestHarness(
+		CepOperator<Event, Integer, T> cepOperator) throws Exception {
 		KeySelector<Event, Integer> keySelector = new TestKeySelector();
 
 		return new KeyedOneInputStreamOperatorTestHarness<>(
@@ -56,14 +57,14 @@ public Integer getKey(Event value) throws Exception {
 			BasicTypeInfo.INT_TYPE_INFO);
 	}
 
-	public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
+	public static <K> CepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
 		boolean isProcessingTime,
 		NFACompiler.NFAFactory<Event> nfaFactory) {
 
 		return getKeyedCepOpearator(isProcessingTime, nfaFactory, null);
 	}
 
-	public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
+	public static <K> CepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
 			boolean isProcessingTime,
 			NFACompiler.NFAFactory<Event> nfaFactory,
 			EventComparator<Event> comparator) {
@@ -71,26 +72,30 @@ public Integer getKey(Event value) throws Exception {
 		return getKeyedCepOpearator(isProcessingTime, nfaFactory, comparator, null);
 	}
 
-	public static <K> SelectCepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
+	public static <K> CepOperator<Event, K, Map<String, List<Event>>> getKeyedCepOpearator(
 			boolean isProcessingTime,
 			NFACompiler.NFAFactory<Event> nfaFactory,
 			EventComparator<Event> comparator,
 			OutputTag<Event> outputTag) {
 
-		return new SelectCepOperator<>(
+		return new CepOperator<>(
 			Event.createTypeSerializer(),
 			isProcessingTime,
 			nfaFactory,
 			comparator,
 			null,
-			new PatternSelectFunction<Event, Map<String, List<Event>>>() {
+			new PatternProcessFunction<Event, Map<String, List<Event>>>() {
 				private static final long serialVersionUID = -7143807777582726991L;
 
 				@Override
-				public Map<String, List<Event>> select(Map<String, List<Event>> pattern) throws Exception {
-					return pattern;
+				public void processMatch(
+						Map<String, List<Event>> match,
+						Context ctx,
+						Collector<Map<String, List<Event>>> out) throws Exception {
+					out.collect(match);
 				}
-			}, outputTag);
+			},
+			outputTag);
 	}
 
 	private CepOperatorTestUtilities() {
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
new file mode 100644
index 00000000000..09477487880
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepProcessFunctionContextTest.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+import static org.apache.flink.cep.operator.CepOperatorTestUtilities.getCepTestHarness;
+import static org.apache.flink.cep.utils.EventBuilder.event;
+import static org.apache.flink.cep.utils.OutputAsserter.assertOutput;
+
+/**
+ * Tests for {@link CepOperator} which check proper setting {@link PatternProcessFunction.Context}.
+ */
+public class CepProcessFunctionContextTest extends TestLogger {
+
+	private static final boolean PROCESSING_TIME = false;
+	private static final boolean EVENT_TIME = true;
+
+	@Test
+	public void testTimestampPassingInEventTime() throws Exception {
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractTimestampAndNames(1),
+					new NFAForwardingFactory(),
+					PROCESSING_TIME))) {
+			harness.open();
+
+			// events out of order to test if internal sorting does not mess up the timestamps
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+
+			harness.processWatermark(6);
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("3:B")
+				.nextElementEquals("5:A")
+				.watermarkEquals(6L)
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testTimestampPassingInProcessingTime() throws Exception {
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractTimestampAndNames(1),
+					new NFAForwardingFactory(),
+					EVENT_TIME))) {
+			harness.open();
+
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("(NO_TIMESTAMP):A")
+				.nextElementEquals("(NO_TIMESTAMP):B")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testCurrentProcessingTimeInProcessingTime() throws Exception {
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractCurrentProcessingTimeAndNames(1),
+					new NFAForwardingFactory(),
+					EVENT_TIME))) {
+			harness.open();
+
+			harness.setProcessingTime(15);
+			harness.processElement(event().withName("A").asStreamRecord());
+			harness.setProcessingTime(35);
+			harness.processElement(event().withName("B").asStreamRecord());
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("15:A")
+				.nextElementEquals("35:B")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testCurrentProcessingTimeInEventTime() throws Exception {
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractCurrentProcessingTimeAndNames(1),
+					new NFAForwardingFactory(),
+					PROCESSING_TIME))) {
+			harness.open();
+
+			harness.setProcessingTime(10);
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.setProcessingTime(100);
+			harness.processWatermark(6);
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("100:A")
+				.watermarkEquals(6)
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testTimestampPassingForTimedOutInEventTime() throws Exception {
+
+		OutputTag<String> timedOut = new OutputTag<String>("timedOut") {};
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractTimestampAndNames(2, timedOut),
+					new NFATimingOutFactory(),
+					PROCESSING_TIME))) {
+			harness.open();
+
+			// events out of order to test if internal sorting does not mess up the timestamps
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.processElement(event().withName("C").withTimestamp(20).asStreamRecord());
+			harness.processElement(event().withName("B").withTimestamp(3).asStreamRecord());
+
+			harness.processWatermark(22);
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("5:B:A")
+				.watermarkEquals(22)
+				.hasNoMoreElements();
+
+			assertOutput(harness.getSideOutput(timedOut))
+				.nextElementEquals("15:A")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testTimestampPassingForTimedOutInProcessingTime() throws Exception {
+
+		OutputTag<String> timedOut = new OutputTag<String>("timedOut") {};
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractTimestampAndNames(2, timedOut),
+					new NFATimingOutFactory(),
+					EVENT_TIME))) {
+			harness.open();
+
+			harness.setProcessingTime(3);
+			harness.processElement(event().withName("A").withTimestamp(3).asStreamRecord());
+			harness.setProcessingTime(5);
+			harness.processElement(event().withName("C").withTimestamp(5).asStreamRecord());
+			harness.setProcessingTime(20);
+			harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("(NO_TIMESTAMP):A:C")
+				.hasNoMoreElements();
+
+			assertOutput(harness.getSideOutput(timedOut))
+				.nextElementEquals("(NO_TIMESTAMP):C")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testCurrentProcessingTimeForTimedOutInEventTime() throws Exception {
+
+		OutputTag<String> sideOutputTag = new OutputTag<String>("timedOut") {};
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
+					new NFATimingOutFactory(),
+					PROCESSING_TIME))) {
+			harness.open();
+
+			// events out of order to test if internal sorting does not mess up the timestamps
+			harness.processElement(event().withName("A").withTimestamp(5).asStreamRecord());
+			harness.processElement(event().withName("B").withTimestamp(20).asStreamRecord());
+			harness.processElement(event().withName("C").withTimestamp(3).asStreamRecord());
+
+			harness.setProcessingTime(100);
+			harness.processWatermark(22);
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("100:C:A")
+				.watermarkEquals(22)
+				.hasNoMoreElements();
+
+			assertOutput(harness.getSideOutput(sideOutputTag))
+				.nextElementEquals("100:A")
+				.hasNoMoreElements();
+		}
+	}
+
+	@Test
+	public void testCurrentProcessingTimeForTimedOutInProcessingTime() throws Exception {
+
+		OutputTag<String> sideOutputTag = new OutputTag<String>("timedOut") {};
+
+		try (
+			OneInputStreamOperatorTestHarness<Event, String> harness = getCepTestHarness(
+				createCepOperator(
+					extractCurrentProcessingTimeAndNames(2, sideOutputTag),
+					new NFATimingOutFactory(),
+					EVENT_TIME))) {
+			harness.open();
+
+			harness.setProcessingTime(3);
+			harness.processElement(event().withName("A").asStreamRecord());
+			harness.setProcessingTime(5);
+			harness.processElement(event().withName("B").asStreamRecord());
+			harness.setProcessingTime(20);
+			harness.processElement(event().withName("C").asStreamRecord());
+
+			assertOutput(harness.getOutput())
+				.nextElementEquals("5:A:B")
+				.hasNoMoreElements();
+
+			// right now we time out only on next event in processing time, therefore the 20
+			assertOutput(harness.getSideOutput(sideOutputTag))
+				.nextElementEquals("20:B")
+				.hasNoMoreElements();
+		}
+	}
+
+	/* TEST UTILS */
+
+	private <T> CepOperator<Event, Integer, T> createCepOperator(
+			PatternProcessFunction<Event, T> processFunction,
+			NFACompiler.NFAFactory<Event> nfaFactory,
+			boolean isProcessingTime) throws Exception {
+		return new CepOperator<>(
+			Event.createTypeSerializer(),
+			isProcessingTime,
+			nfaFactory,
+			null,
+			null,
+			processFunction,
+			null);
+	}
+
+	/**
+	 * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows:
+	 * <pre>[timestamp]:[Event.getName]...</pre> The Event.getName will occur stateNumber times. If the match does not
+	 * contain n-th pattern it will replace this position with "null".
+	 *
+	 * @param stateNumber number of states in the pattern
+	 * @return created PatternProcessFunction
+	 */
+	private static PatternProcessFunction<Event, String> extractTimestampAndNames(int stateNumber) {
+		return new AccessContextWithNames(stateNumber,
+			context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+	}
+
+	/**
+	 * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows:
+	 * <pre>[timestamp]:[Event.getName]...</pre> The Event.getName will occur stateNumber times. If the match does not
+	 * contain n-th pattern it will replace this position with "null".
+	 *
+	 * <p>This function will also apply the same logic for timed out partial matches and emit those results into
+	 * side output described with given output tag.
+	 *
+	 * @param stateNumber number of states in the pattern
+	 * @param timedOutTag output tag where to emit timed out partial matches
+	 * @return created PatternProcessFunction
+	 */
+	private static PatternProcessFunction<Event, String> extractTimestampAndNames(
+			int stateNumber,
+			OutputTag<String> timedOutTag) {
+		return new AccessContextWithNamesWithTimedOut(stateNumber,
+			timedOutTag,
+			context -> context.timestamp() != null ? String.valueOf(context.timestamp()) : NO_TIMESTAMP);
+	}
+
+	/**
+	 * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows:
+	 * <pre>[currentProcessingTime]:[Event.getName]...</pre> The Event.getName will occur stateNumber times.
+	 * If the match does not contain n-th pattern it will replace this position with "null".
+	 *
+	 * @param stateNumber number of states in the pattern
+	 * @return created PatternProcessFunction
+	 */
+	private static PatternProcessFunction<Event, String> extractCurrentProcessingTimeAndNames(int stateNumber) {
+		return new AccessContextWithNames(stateNumber, context -> String.valueOf(context.currentProcessingTime()));
+	}
+
+	/**
+	 * Creates a {@link PatternProcessFunction} that as a result will produce Strings as follows:
+	 * <pre>[currentProcessingTime]:[Event.getName]...</pre> The Event.getName will occur stateNumber times.
+	 * If the match does not contain n-th pattern it will replace this position with "null".
+	 *
+	 * <p>This function will also apply the same logic for timed out partial matches and emit those results into
+	 * side output described with given output tag.
+	 *
+	 * @param stateNumber number of states in the pattern
+	 * @param timedOutTag output tag where to emit timed out partial matches
+	 * @return created PatternProcessFunction
+	 */
+	private static PatternProcessFunction<Event, String> extractCurrentProcessingTimeAndNames(
+			int stateNumber,
+			OutputTag<String> timedOutTag) {
+		return new AccessContextWithNamesWithTimedOut(stateNumber,
+			timedOutTag,
+			context -> String.valueOf(context.currentProcessingTime()));
+	}
+
+	private static final String NO_TIMESTAMP = "(NO_TIMESTAMP)";
+
+	static class AccessContextWithNames extends PatternProcessFunction<Event, String> {
+
+		private final int stateCount;
+		private final Function<Context, String> contextAccessor;
+
+		AccessContextWithNames(int stateCount, Function<Context, String> contextAccessor) {
+			this.stateCount = stateCount;
+			this.contextAccessor = contextAccessor;
+		}
+
+		@Override
+		public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<String> out) throws Exception {
+			out.collect(extractResult(match, ctx));
+		}
+
+		String extractResult(Map<String, List<Event>> match, Context ctx) {
+			StringBuilder stringBuilder = new StringBuilder(contextAccessor.apply(ctx));
+			for (int i = 1; i <= stateCount; i++) {
+				List<Event> events = match.get("" + i);
+				if (events != null) {
+					stringBuilder.append(":").append(events.get(0).getName());
+				}
+			}
+			return stringBuilder.toString();
+		}
+	}
+
+	static final class AccessContextWithNamesWithTimedOut extends AccessContextWithNames
+		implements TimedOutPartialMatchHandler<Event> {
+
+		private OutputTag<String> outputTag;
+
+		AccessContextWithNamesWithTimedOut(
+				int stateCount,
+				OutputTag<String> outputTag,
+				Function<Context, String> contextAccessor) {
+			super(stateCount, contextAccessor);
+			this.outputTag = outputTag;
+		}
+
+		@Override
+		public void processTimedOutMatch(Map<String, List<Event>> match, Context ctx) throws Exception {
+			ctx.output(outputTag, extractResult(match, ctx));
+		}
+	}
+
+	/**
+	 * This NFA consists of one state accepting any element.
+	 */
+	private static class NFAForwardingFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.begin("1");
+
+			return NFACompiler.compileFactory(pattern, false).createNFA();
+		}
+	}
+
+	/**
+	 * This NFA consists of two states accepting any element. It times out after 10 milliseconds
+	 */
+	private static class NFATimingOutFactory implements NFACompiler.NFAFactory<Event> {
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		@Override
+		public NFA<Event> createNFA() {
+
+			Pattern<Event, ?> pattern = Pattern.<Event>begin("1").next("2").within(Time.milliseconds(10));
+
+			return NFACompiler.compileFactory(pattern, true).createNFA();
+		}
+	}
+
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
similarity index 66%
rename from flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
rename to flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
index ef7ee89881e..b77dd947cf1 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CepRuntimeContextTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CepRuntimeContextTest.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cep;
+package org.apache.flink.cep.operator;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.accumulators.Accumulator;
@@ -25,7 +25,6 @@
 import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichFunction;
 import org.apache.flink.api.common.functions.RuntimeContext;
 import org.apache.flink.api.common.state.AggregatingStateDescriptor;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -33,100 +32,84 @@
 import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.cep.pattern.conditions.RichAndCondition;
-import org.apache.flink.cep.pattern.conditions.RichCompositeIterativeCondition;
-import org.apache.flink.cep.pattern.conditions.RichIterativeCondition;
-import org.apache.flink.cep.pattern.conditions.RichNotCondition;
-import org.apache.flink.cep.pattern.conditions.RichOrCondition;
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.pattern.Pattern;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
 
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.cep.operator.CepOperatorTestUtilities.getCepTestHarness;
+import static org.apache.flink.cep.operator.CepRuntimeContextTest.MockProcessFunctionAsserter.assertFunction;
+import static org.apache.flink.cep.utils.CepOperatorBuilder.createOperatorForNFA;
+import static org.apache.flink.cep.utils.EventBuilder.event;
+import static org.hamcrest.core.Is.is;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 /**
  * Test cases for {@link CepRuntimeContext}.
  */
-public class CepRuntimeContextTest {
+public class CepRuntimeContextTest extends TestLogger {
 
 	@Test
-	public void testRichCompositeIterativeCondition() throws Exception {
-		RichIterativeCondition<Integer> first = new TestRichIterativeCondition();
-		RichIterativeCondition<Integer> second = new TestRichIterativeCondition();
-		RichIterativeCondition<Integer> third = new TestRichIterativeCondition();
-
-		RichCompositeIterativeCondition function = new RichCompositeIterativeCondition(first, second, third) {
-			@Override
-			public boolean filter(Object value, Context ctx) throws Exception {
-				return false;
-			}
-		};
-		function.setRuntimeContext(mock(RuntimeContext.class));
-
-		assertTrue(first.getRuntimeContext() instanceof CepRuntimeContext);
-		assertTrue(second.getRuntimeContext() instanceof CepRuntimeContext);
-		assertTrue(third.getRuntimeContext() instanceof CepRuntimeContext);
-	}
+	public void testCepRuntimeContextIsSetInNFA() throws Exception {
 
-	@Test
-	public void testRichAndCondition() throws Exception {
-		RichIterativeCondition<Integer> left = new TestRichIterativeCondition();
-		RichIterativeCondition<Integer> right = new TestRichIterativeCondition();
+		@SuppressWarnings("unchecked")
+		final NFA<Event> mockNFA = mock(NFA.class);
 
-		RichAndCondition function = new RichAndCondition<>(left, right);
-		function.setRuntimeContext(mock(RuntimeContext.class));
+		try (
+			OneInputStreamOperatorTestHarness<Event, Map<String, List<Event>>> harness = getCepTestHarness(
+				createOperatorForNFA(mockNFA).build())) {
 
-		assertTrue(left.getRuntimeContext() instanceof CepRuntimeContext);
-		assertTrue(right.getRuntimeContext() instanceof CepRuntimeContext);
+			harness.open();
+			verify(mockNFA).open(any(CepRuntimeContext.class), any(Configuration.class));
+		}
 	}
 
 	@Test
-	public void testRichOrCondition() throws Exception {
-		RichIterativeCondition<Integer> left = new TestRichIterativeCondition();
-		RichIterativeCondition<Integer> right = new TestRichIterativeCondition();
-
-		RichOrCondition function = new RichOrCondition<>(left, right);
-		function.setRuntimeContext(mock(RuntimeContext.class));
-
-		assertTrue(left.getRuntimeContext() instanceof CepRuntimeContext);
-		assertTrue(right.getRuntimeContext() instanceof CepRuntimeContext);
-	}
+	public void testCepRuntimeContextIsSetInProcessFunction() throws Exception {
 
-	@Test
-	public void testRichNotCondition() {
-		RichIterativeCondition<Integer> original = new TestRichIterativeCondition();
+		final VerifyRuntimeContextProcessFunction processFunction = new VerifyRuntimeContextProcessFunction();
 
-		RichNotCondition function = new RichNotCondition<>(original);
-		function.setRuntimeContext(mock(RuntimeContext.class));
+		try (
+			OneInputStreamOperatorTestHarness<Event, Event> harness = getCepTestHarness(
+				createOperatorForNFA(getSingleElementAlwaysTrueNFA())
+					.withFunction(processFunction)
+					.build())) {
 
-		assertTrue(original.getRuntimeContext() instanceof CepRuntimeContext);
-	}
+			harness.open();
+			Event record = event().withName("A").build();
+			harness.processElement(record, 0);
 
-	@Test
-	public void testRichPatternSelectFunction() {
-		verifyRuntimeContext(new TestRichPatternSelectFunction());
+			assertFunction(processFunction)
+				.checkOpenCalled()
+				.checkCloseCalled()
+				.checkProcessMatchCalled();
+		}
 	}
 
-	@Test
-	public void testRichPatternFlatSelectFunction() {
-		verifyRuntimeContext(new TestRichPatternFlatSelectFunction());
+	private NFA<Event> getSingleElementAlwaysTrueNFA() {
+		return NFACompiler.compileFactory(Pattern.<Event>begin("A"), false).createNFA();
 	}
 
 	@Test
-	public void testRichIterativeCondition() {
-		verifyRuntimeContext(new TestRichIterativeCondition());
-	}
-
-	private void verifyRuntimeContext(final RichFunction function) {
+	public void testCepRuntimeContext() {
 		final String taskName = "foobarTask";
 		final MetricGroup metricGroup = new UnregisteredMetricsGroup();
 		final int numberOfParallelSubtasks = 42;
@@ -149,11 +132,8 @@ private void verifyRuntimeContext(final RichFunction function) {
 		when(mockedRuntimeContext.getUserCodeClassLoader()).thenReturn(userCodeClassLoader);
 		when(mockedRuntimeContext.getDistributedCache()).thenReturn(distributedCache);
 
-		function.setRuntimeContext(mockedRuntimeContext);
-
-		RuntimeContext runtimeContext = function.getRuntimeContext();
+		RuntimeContext runtimeContext = new CepRuntimeContext(mockedRuntimeContext);
 
-		assertTrue(runtimeContext instanceof CepRuntimeContext);
 		assertEquals(taskName, runtimeContext.getTaskName());
 		assertEquals(metricGroup, runtimeContext.getMetricGroup());
 		assertEquals(numberOfParallelSubtasks, runtimeContext.getNumberOfParallelSubtasks());
@@ -289,30 +269,64 @@ private void verifyRuntimeContext(final RichFunction function) {
 		}
 	}
 
-	private static class TestRichIterativeCondition extends RichIterativeCondition<Integer> {
-		private static final long serialVersionUID = 1L;
+	/* Test Utils */
+	static class MockProcessFunctionAsserter {
+		private final VerifyRuntimeContextProcessFunction function;
 
-		@Override
-		public boolean filter(Integer value, Context<Integer> ctx) throws Exception {
-			return false;
+		static MockProcessFunctionAsserter assertFunction(VerifyRuntimeContextProcessFunction function) {
+			return new MockProcessFunctionAsserter(function);
+		}
+
+		private MockProcessFunctionAsserter(VerifyRuntimeContextProcessFunction function) {
+			this.function = function;
+		}
+
+		MockProcessFunctionAsserter checkOpenCalled() {
+			assertThat(function.openCalled, is(true));
+			return this;
+		}
+
+		MockProcessFunctionAsserter checkCloseCalled() {
+			assertThat(function.openCalled, is(true));
+			return this;
+		}
+
+		MockProcessFunctionAsserter checkProcessMatchCalled() {
+			assertThat(function.processMatchCalled, is(true));
+			return this;
 		}
 	}
 
-	private static class TestRichPatternSelectFunction extends RichPatternSelectFunction<Integer, Integer> {
-		private static final long serialVersionUID = 1L;
+	private static class VerifyRuntimeContextProcessFunction extends PatternProcessFunction<Event, Event> {
+
+		boolean openCalled = false;
+		boolean closeCalled = false;
+		boolean processMatchCalled = false;
 
 		@Override
-		public Integer select(Map<String, List<Integer>> pattern) throws Exception {
-			return null;
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			verifyContext();
+			openCalled = true;
+		}
+
+		private void verifyContext() {
+			if (!(getRuntimeContext() instanceof CepRuntimeContext)) {
+				fail("Runtime context was not wrapped in CepRuntimeContext");
+			}
 		}
-	}
 
-	private static class TestRichPatternFlatSelectFunction extends RichPatternFlatSelectFunction<Integer, Integer> {
-		private static final long serialVersionUID = 1L;
+		@Override
+		public void close() throws Exception {
+			super.close();
+			verifyContext();
+			closeCalled = true;
+		}
 
 		@Override
-		public void flatSelect(Map<String, List<Integer>> pattern, Collector<Integer> out) throws Exception {
-			// no op
+		public void processMatch(Map<String, List<Event>> match, Context ctx, Collector<Event> out) throws Exception {
+			verifyContext();
+			processMatchCalled = true;
 		}
 	}
 }
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/CepOperatorBuilder.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/CepOperatorBuilder.java
new file mode 100644
index 00000000000..ef054469c09
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/CepOperatorBuilder.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.cep.EventComparator;
+import org.apache.flink.cep.functions.PatternProcessFunction;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.cep.operator.CepOperator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Test builder for cep operator that accepts {@link Event} as input elements.
+ *
+ * @param <OUT> type of output elements
+ */
+public class CepOperatorBuilder<OUT> {
+
+	private final boolean isProcessingTime;
+	private final NFACompiler.NFAFactory<Event> nfaFactory;
+	private final EventComparator<Event> comparator;
+	private final AfterMatchSkipStrategy skipStrategy;
+	private final PatternProcessFunction<Event, OUT> function;
+	private final OutputTag<Event> lateDataOutputTag;
+
+	public static CepOperatorBuilder<Map<String, List<Event>>> createOperatorForNFA(NFA<Event> nfa) {
+		return new CepOperatorBuilder<>(
+			true,
+			new NFACompiler.NFAFactory<Event>() {
+				@Override
+				public NFA<Event> createNFA() {
+					return nfa;
+				}
+			},
+			null,
+			null,
+			new PatternProcessFunction<Event, Map<String, List<Event>>>() {
+				private static final long serialVersionUID = -7143807777582726991L;
+
+				@Override
+				public void processMatch(
+					Map<String, List<Event>> match,
+					Context ctx,
+					Collector<Map<String, List<Event>>> out) throws Exception {
+					out.collect(match);
+				}
+			},
+			null);
+	}
+
+	public static CepOperatorBuilder<Map<String, List<Event>>> createOperatorForNFAFactory(NFACompiler.NFAFactory<Event> nfaFactory) {
+		return new CepOperatorBuilder<>(
+			true,
+			nfaFactory,
+			null,
+			null,
+			new PatternProcessFunction<Event, Map<String, List<Event>>>() {
+				private static final long serialVersionUID = -7143807777582726991L;
+
+				@Override
+				public void processMatch(
+					Map<String, List<Event>> match,
+					Context ctx,
+					Collector<Map<String, List<Event>>> out) throws Exception {
+					out.collect(match);
+				}
+			},
+			null);
+	}
+
+	private CepOperatorBuilder(
+		boolean isProcessingTime,
+		NFACompiler.NFAFactory<Event> nfaFactory,
+		EventComparator<Event> comparator,
+		AfterMatchSkipStrategy skipStrategy,
+		PatternProcessFunction<Event, OUT> processFunction,
+		OutputTag<Event> lateDataOutputTag) {
+		this.isProcessingTime = isProcessingTime;
+		this.nfaFactory = nfaFactory;
+		this.comparator = comparator;
+		this.skipStrategy = skipStrategy;
+		function = processFunction;
+		this.lateDataOutputTag = lateDataOutputTag;
+	}
+
+	public CepOperatorBuilder<OUT> inProcessingTime() {
+		return new CepOperatorBuilder<>(
+			true,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> inEventTime() {
+		return new CepOperatorBuilder<>(
+			false,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> withComparator(EventComparator<Event> comparator) {
+		return new CepOperatorBuilder<>(
+			false,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> withSkipStrategy(AfterMatchSkipStrategy skipStrategy) {
+		return new CepOperatorBuilder<>(
+			false,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> withLateDataOutputTag(OutputTag<Event> lateDataOutputTag) {
+		return new CepOperatorBuilder<>(
+			false,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public CepOperatorBuilder<OUT> withNFA(NFA<Event> nfa) {
+		return new CepOperatorBuilder<>(
+			false,
+			new NFACompiler.NFAFactory<Event>() {
+				@Override
+				public NFA<Event> createNFA() {
+					return nfa;
+				}
+			},
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+
+	public <T> CepOperatorBuilder<T> withFunction(PatternProcessFunction<Event, T> processFunction) {
+		return new CepOperatorBuilder<>(
+			isProcessingTime,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			processFunction,
+			lateDataOutputTag);
+	}
+
+	public <K> CepOperator<Event, K, OUT> build() {
+		return new CepOperator<>(Event.createTypeSerializer(),
+			isProcessingTime,
+			nfaFactory,
+			comparator,
+			skipStrategy,
+			function,
+			lateDataOutputTag);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/EventBuilder.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/EventBuilder.java
new file mode 100644
index 00000000000..6915de0a492
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/EventBuilder.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.cep.Event;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+/**
+ * Builder for {@link Event} that is used in cep tests.
+ */
+public class EventBuilder {
+
+	private final int id;
+	private final double price;
+	private final String name;
+	private final Long timestamp;
+
+	private EventBuilder(int id, double price, String name, Long timestamp) {
+		this.id = id;
+		this.price = price;
+		this.name = name;
+		this.timestamp = timestamp;
+	}
+
+	public static EventBuilder event() {
+		return new EventBuilder(0, 0.0, "", null);
+	}
+
+	public EventBuilder withId(int id) {
+		return new EventBuilder(id, price, name, timestamp);
+	}
+
+	public EventBuilder withPrice(double price) {
+		return new EventBuilder(id, price, name, timestamp);
+	}
+
+	public EventBuilder withName(String name) {
+		return new EventBuilder(id, price, name, timestamp);
+	}
+
+	public EventBuilder withTimestamp(long timestamp) {
+		return new EventBuilder(id, price, name, timestamp);
+	}
+
+	public StreamRecord<Event> asStreamRecord() {
+		if (timestamp != null) {
+			return new StreamRecord<>(build(), timestamp);
+		} else {
+			return new StreamRecord<>(build());
+		}
+	}
+
+	public Event build() {
+		return new Event(id, name, price);
+	}
+}
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java
new file mode 100644
index 00000000000..7457331112c
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/utils/OutputAsserter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.utils;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+
+import java.util.Queue;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Asserter for output from {@link OneInputStreamOperatorTestHarness}.
+ */
+public class OutputAsserter {
+
+	private final Queue<?> output;
+
+	private OutputAsserter(Queue<?> output) {
+		this.output = output;
+	}
+
+	public static OutputAsserter assertOutput(Queue<?> output) {
+		return new OutputAsserter(output);
+	}
+
+	private AssertionError fail(Object record) {
+		return new AssertionError("Received unexpected element: " + record);
+	}
+
+	public <T> OutputAsserter nextElementEquals(T expected) {
+		final Object record = output.poll();
+		final Object actual;
+		if (record instanceof StreamRecord) {
+			// This is in case we assert side output
+			actual = ((StreamRecord) record).getValue();
+		} else {
+			// This is in case we assert the main output
+			actual = record;
+		}
+		assertThat(actual, is(expected));
+		return this;
+	}
+
+	public void hasNoMoreElements() {
+		assertTrue(output.isEmpty());
+	}
+
+	public OutputAsserter watermarkEquals(long timestamp) {
+		Object record = output.poll();
+		if (record instanceof Watermark) {
+			Watermark watermark = (Watermark) record;
+			assertThat(watermark.getTimestamp(), is(timestamp));
+		} else {
+			throw fail(record);
+		}
+		return this;
+	}
+}
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
index cbb395e232f..fde93ff1679 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MaxAggFunctionWithRetract.scala
@@ -18,19 +18,20 @@
 package org.apache.flink.table.functions.aggfunctions
 
 import java.math.BigDecimal
-import java.util.{HashMap => JHashMap}
-import java.lang.{Iterable => JIterable}
+import java.lang.{Iterable => JIterable, Long => JLong}
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
-import org.apache.flink.table.api.Types
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types}
+import org.apache.flink.table.api.dataview.MapView
 import org.apache.flink.table.functions.aggfunctions.Ordering._
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Max with retraction aggregate function */
-class MaxWithRetractAccumulator[T] extends JTuple2[T, JHashMap[T, Long]]
+class MaxWithRetractAccumulator[T] {
+  var max: T = _
+  var distinctCount: JLong = _
+  var map: MapView[T, JLong] = _
+}
 
 /**
   * Base class for built-in Max with retraction aggregate function
@@ -42,8 +43,10 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T])
 
   override def createAccumulator(): MaxWithRetractAccumulator[T] = {
     val acc = new MaxWithRetractAccumulator[T]
-    acc.f0 = getInitValue //max
-    acc.f1 = new JHashMap[T, Long]() //store the count for each value
+    acc.max = getInitValue //max
+    acc.distinctCount = 0L
+    acc.map = new MapView(getValueTypeInfo, Types.LONG)
+      .asInstanceOf[MapView[T, JLong]] //store the count for each value
     acc
   }
 
@@ -51,16 +54,17 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T])
     if (value != null) {
       val v = value.asInstanceOf[T]
 
-      if (acc.f1.size() == 0 || (ord.compare(acc.f0, v) < 0)) {
-        acc.f0 = v
+      if (acc.distinctCount == 0 || (ord.compare(acc.max, v) < 0)) {
+        acc.max = v
       }
 
-      if (!acc.f1.containsKey(v)) {
-        acc.f1.put(v, 1L)
+      var count = acc.map.get(v)
+      if (count == null) {
+        acc.map.put(v, 1L)
+        acc.distinctCount += 1
       } else {
-        var count = acc.f1.get(v)
         count += 1L
-        acc.f1.put(v, count)
+        acc.map.put(v, count)
       }
     }
   }
@@ -69,39 +73,45 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T])
     if (value != null) {
       val v = value.asInstanceOf[T]
 
-      var count = acc.f1.get(v)
-      count -= 1L
-      if (count == 0) {
+      val count = acc.map.get(v)
+      if (count == null || count == 1) {
         //remove the key v from the map if the number of appearance of the value v is 0
-        acc.f1.remove(v)
+        if (count != null) {
+          acc.map.remove(v)
+        }
         //if the total count is 0, we could just simply set the f0(max) to the initial value
-        if (acc.f1.size() == 0) {
-          acc.f0 = getInitValue
+        acc.distinctCount -= 1
+        if (acc.distinctCount == 0) {
+          acc.max = getInitValue
           return
         }
         //if v is the current max value, we have to iterate the map to find the 2nd biggest
         // value to replace v as the max value
-        if (v == acc.f0) {
-          val iterator = acc.f1.keySet().iterator()
-          var key = iterator.next()
-          acc.f0 = key
+        if (v == acc.max) {
+          val iterator = acc.map.keys.iterator()
+          var hasMax = false
           while (iterator.hasNext) {
-            key = iterator.next()
-            if (ord.compare(acc.f0, key) < 0) {
-              acc.f0 = key
+            val key = iterator.next()
+            if (!hasMax || ord.compare(acc.max, key) < 0) {
+              acc.max = key
+              hasMax = true
             }
           }
+
+          if (!hasMax) {
+            acc.distinctCount = 0L
+          }
         }
       } else {
-        acc.f1.put(v, count)
+        acc.map.put(v, count - 1)
       }
     }
 
   }
 
   override def getValue(acc: MaxWithRetractAccumulator[T]): T = {
-    if (acc.f1.size() != 0) {
-      acc.f0
+    if (acc.distinctCount != 0) {
+      acc.max
     } else {
       null.asInstanceOf[T]
     }
@@ -112,19 +122,23 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T])
     val iter = its.iterator()
     while (iter.hasNext) {
       val a = iter.next()
-      if (a.f1.size() != 0) {
+      if (a.distinctCount != 0) {
         // set max element
-        if (ord.compare(acc.f0, a.f0) < 0) {
-          acc.f0 = a.f0
+        if (ord.compare(acc.max, a.max) < 0) {
+          acc.max = a.max
         }
         // merge the count for each key
-        val iterator = a.f1.keySet().iterator()
+        val iterator = a.map.entries.iterator()
         while (iterator.hasNext) {
-          val key = iterator.next()
-          if (acc.f1.containsKey(key)) {
-            acc.f1.put(key, acc.f1.get(key) + a.f1.get(key))
+          val entry = iterator.next()
+          val key = entry.getKey
+          val value = entry.getValue
+          val count = acc.map.get(key)
+          if (count != null) {
+            acc.map.put(key, count + value)
           } else {
-            acc.f1.put(key, a.f1.get(key))
+            acc.map.put(key, value)
+            acc.distinctCount += 1
           }
         }
       }
@@ -132,15 +146,9 @@ abstract class MaxWithRetractAggFunction[T](implicit ord: Ordering[T])
   }
 
   def resetAccumulator(acc: MaxWithRetractAccumulator[T]): Unit = {
-    acc.f0 = getInitValue
-    acc.f1.clear()
-  }
-
-  override def getAccumulatorType: TypeInformation[MaxWithRetractAccumulator[T]] = {
-    new TupleTypeInfo(
-      classOf[MaxWithRetractAccumulator[T]],
-      getValueTypeInfo,
-      new MapTypeInfo(getValueTypeInfo, BasicTypeInfo.LONG_TYPE_INFO))
+    acc.max = getInitValue
+    acc.distinctCount = 0L
+    acc.map.clear()
   }
 
   def getInitValue: T
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
index 480f836e849..f62f2eceb29 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/MinAggFunctionWithRetract.scala
@@ -18,19 +18,20 @@
 package org.apache.flink.table.functions.aggfunctions
 
 import java.math.BigDecimal
-import java.util.{HashMap => JHashMap}
-import java.lang.{Iterable => JIterable}
+import java.lang.{Iterable => JIterable, Long => JLong}
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
-import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
-import org.apache.flink.api.java.typeutils.{MapTypeInfo, TupleTypeInfo}
-import org.apache.flink.table.api.Types
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation, Types}
+import org.apache.flink.table.api.dataview.MapView
 import org.apache.flink.table.functions.aggfunctions.Ordering._
 import org.apache.flink.table.functions.AggregateFunction
 
 /** The initial accumulator for Min with retraction aggregate function */
-class MinWithRetractAccumulator[T] extends JTuple2[T, JHashMap[T, Long]]
+class MinWithRetractAccumulator[T] {
+  var min: T = _
+  var distinctCount: JLong = _
+  var map: MapView[T, JLong] = _
+}
 
 /**
   * Base class for built-in Min with retraction aggregate function
@@ -42,8 +43,10 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T])
 
   override def createAccumulator(): MinWithRetractAccumulator[T] = {
     val acc = new MinWithRetractAccumulator[T]
-    acc.f0 = getInitValue //min
-    acc.f1 = new JHashMap[T, Long]() //store the count for each value
+    acc.min = getInitValue //min
+    acc.distinctCount = 0L
+    acc.map = new MapView(getValueTypeInfo, Types.LONG)
+      .asInstanceOf[MapView[T, JLong]] //store the count for each value
     acc
   }
 
@@ -51,16 +54,17 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T])
     if (value != null) {
       val v = value.asInstanceOf[T]
 
-      if (acc.f1.size() == 0 || (ord.compare(acc.f0, v) > 0)) {
-        acc.f0 = v
+      if (acc.distinctCount == 0 || (ord.compare(acc.min, v) > 0)) {
+        acc.min = v
       }
 
-      if (!acc.f1.containsKey(v)) {
-        acc.f1.put(v, 1L)
+      var count = acc.map.get(v)
+      if (count == null) {
+        acc.map.put(v, 1L)
+        acc.distinctCount += 1
       } else {
-        var count = acc.f1.get(v)
         count += 1L
-        acc.f1.put(v, count)
+        acc.map.put(v, count)
       }
     }
   }
@@ -69,39 +73,45 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T])
     if (value != null) {
       val v = value.asInstanceOf[T]
 
-      var count = acc.f1.get(v)
-      count -= 1L
-      if (count == 0) {
+      val count = acc.map.get(v)
+      if (count == null || count == 1) {
         //remove the key v from the map if the number of appearance of the value v is 0
-        acc.f1.remove(v)
+        if (count != null) {
+          acc.map.remove(v)
+        }
         //if the total count is 0, we could just simply set the f0(min) to the initial value
-        if (acc.f1.size() == 0) {
-          acc.f0 = getInitValue
+        acc.distinctCount -= 1
+        if (acc.distinctCount == 0) {
+          acc.min = getInitValue
           return
         }
         //if v is the current min value, we have to iterate the map to find the 2nd smallest
         // value to replace v as the min value
-        if (v == acc.f0) {
-          val iterator = acc.f1.keySet().iterator()
-          var key = iterator.next()
-          acc.f0 = key
+        if (v == acc.min) {
+          val iterator = acc.map.keys.iterator()
+          var hasMin = false
           while (iterator.hasNext) {
-            key = iterator.next()
-            if (ord.compare(acc.f0, key) > 0) {
-              acc.f0 = key
+            val key = iterator.next()
+            if (!hasMin || ord.compare(acc.min, key) > 0) {
+              acc.min = key
+              hasMin = true
             }
           }
+
+          if (!hasMin) {
+            acc.distinctCount = 0L
+          }
         }
       } else {
-        acc.f1.put(v, count)
+        acc.map.put(v, count - 1)
       }
     }
 
   }
 
   override def getValue(acc: MinWithRetractAccumulator[T]): T = {
-    if (acc.f1.size() != 0) {
-      acc.f0
+    if (acc.distinctCount != 0) {
+      acc.min
     } else {
       null.asInstanceOf[T]
     }
@@ -112,19 +122,23 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T])
     val iter = its.iterator()
     while (iter.hasNext) {
       val a = iter.next()
-      if (a.f1.size() != 0) {
+      if (a.distinctCount != 0) {
         // set min element
-        if (ord.compare(acc.f0, a.f0) > 0) {
-          acc.f0 = a.f0
+        if (ord.compare(acc.min, a.min) > 0) {
+          acc.min = a.min
         }
         // merge the count for each key
-        val iterator = a.f1.keySet().iterator()
+        val iterator = a.map.entries.iterator()
         while (iterator.hasNext) {
-          val key = iterator.next()
-          if (acc.f1.containsKey(key)) {
-            acc.f1.put(key, acc.f1.get(key) + a.f1.get(key))
+          val entry = iterator.next()
+          val key = entry.getKey
+          val value = entry.getValue
+          val count = acc.map.get(key)
+          if (count != null) {
+            acc.map.put(key, count + value)
           } else {
-            acc.f1.put(key, a.f1.get(key))
+            acc.map.put(key, value)
+            acc.distinctCount += 1
           }
         }
       }
@@ -132,15 +146,9 @@ abstract class MinWithRetractAggFunction[T](implicit ord: Ordering[T])
   }
 
   def resetAccumulator(acc: MinWithRetractAccumulator[T]): Unit = {
-    acc.f0 = getInitValue
-    acc.f1.clear()
-  }
-
-  override def getAccumulatorType: TypeInformation[MinWithRetractAccumulator[T]] = {
-    new TupleTypeInfo(
-      classOf[MinWithRetractAccumulator[T]],
-      getValueTypeInfo,
-      new MapTypeInfo(getValueTypeInfo, BasicTypeInfo.LONG_TYPE_INFO))
+    acc.min = getInitValue
+    acc.distinctCount = 0L
+    acc.map.clear()
   }
 
   def getInitValue: T
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
index f1386dfea46..f5cf1910b9a 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala
@@ -1250,23 +1250,17 @@ object AggregateUtil {
       aggregateInputTypes,
       tableConfig)
 
-    val (accumulatorType, accSpecs) = aggregateFunction match {
-      case collect: SqlAggFunction if collect.getKind == SqlKind.COLLECT =>
-        removeStateViewFieldsFromAccTypeInfo(
-          uniqueIdWithinAggregate,
-          aggregate,
-          aggregate.getAccumulatorType,
-          isStateBackedDataViews)
-
-      case udagg: AggSqlFunction =>
-        removeStateViewFieldsFromAccTypeInfo(
-          uniqueIdWithinAggregate,
-          aggregate,
-          udagg.accType,
-          isStateBackedDataViews)
+    val (accumulatorType, accSpecs) = {
+      val accType = aggregateFunction match {
+        case udagg: AggSqlFunction => udagg.accType
+        case _ => getAccumulatorTypeOfAggregateFunction(aggregate)
+      }
 
-      case _ =>
-        (getAccumulatorTypeOfAggregateFunction(aggregate), None)
+      removeStateViewFieldsFromAccTypeInfo(
+        uniqueIdWithinAggregate,
+        aggregate,
+        accType,
+        isStateBackedDataViews)
     }
 
     // create distinct accumulator filter argument
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AggFunctionTestBase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AggFunctionTestBase.scala
index bdd1df04894..f0e273215c5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AggFunctionTestBase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AggFunctionTestBase.scala
@@ -23,7 +23,7 @@ import java.math.BigDecimal
 import java.util.{ArrayList => JArrayList, List => JList}
 
 import org.apache.flink.table.functions.AggregateFunction
-import org.apache.flink.table.functions.aggfunctions.{DecimalAvgAccumulator, DecimalSumWithRetractAccumulator}
+import org.apache.flink.table.functions.aggfunctions.{DecimalAvgAccumulator, DecimalSumWithRetractAccumulator, MaxWithRetractAccumulator, MinWithRetractAccumulator}
 import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.junit.Assert.assertEquals
 import org.junit.Test
@@ -137,6 +137,12 @@ abstract class AggFunctionTestBase[T, ACC] {
       case (e: BigDecimal, r: BigDecimal) =>
         // BigDecimal.equals() value and scale but we are only interested in value.
         assert(e.compareTo(r) == 0)
+      case (e: MinWithRetractAccumulator[_], r: MinWithRetractAccumulator[_]) =>
+        assertEquals(e.min, r.min)
+        assertEquals(e.distinctCount, r.distinctCount)
+      case (e: MaxWithRetractAccumulator[_], r: MaxWithRetractAccumulator[_]) =>
+        assertEquals(e.max, r.max)
+        assertEquals(e.distinctCount, r.distinctCount)
       case _ =>
         assertEquals(expected, result)
     }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
index 0549339381d..5c1a780e217 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/AggFunctionHarnessTest.scala
@@ -107,4 +107,84 @@ class AggFunctionHarnessTest extends HarnessTestBase {
 
     testHarness.close()
   }
+
+  @Test
+  def testMinMaxAggFunctionWithRetract(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+    val tEnv = TableEnvironment.getTableEnvironment(env)
+
+    val data = new mutable.MutableList[(JInt, JInt, String)]
+    val t = env.fromCollection(data).toTable(tEnv, 'a, 'b, 'c)
+    tEnv.registerTable("T", t)
+    val sqlQuery = tEnv.sqlQuery(
+      s"""
+         |SELECT
+         |  c, min(a), max(b)
+         |FROM (
+         |  SELECT a, b, c
+         |  FROM T
+         |  GROUP BY a, b, c
+         |) GROUP BY c
+         |""".stripMargin)
+
+    val testHarness = createHarnessTester[String, CRow, CRow](
+      sqlQuery.toRetractStream[Row](queryConfig), "groupBy")
+
+    testHarness.setStateBackend(getStateBackend)
+    testHarness.open()
+
+    val operator = getOperator(testHarness)
+    val minState = getState(
+      operator,
+      "function",
+      classOf[GroupAggProcessFunction],
+      "acc0_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+    val maxState = getState(
+      operator,
+      "function",
+      classOf[GroupAggProcessFunction],
+      "acc1_map_dataview").asInstanceOf[MapView[JInt, JInt]]
+    assertTrue(minState.isInstanceOf[StateMapView[_, _]])
+    assertTrue(maxState.isInstanceOf[StateMapView[_, _]])
+    assertTrue(operator.getKeyedStateBackend.isInstanceOf[RocksDBKeyedStateBackend[_]])
+
+    val expectedOutput = new ConcurrentLinkedQueue[Object]()
+
+    testHarness.processElement(new StreamRecord(CRow(1: JInt, 1: JInt, "aaa"), 1))
+    expectedOutput.add(new StreamRecord(CRow("aaa", 1, 1), 1))
+
+    testHarness.processElement(new StreamRecord(CRow(1: JInt, 1: JInt, "bbb"), 1))
+    expectedOutput.add(new StreamRecord(CRow("bbb", 1, 1), 1))
+
+    // min/max doesn't change
+    testHarness.processElement(new StreamRecord(CRow(2: JInt, 0: JInt, "aaa"), 1))
+
+    // min/max changed
+    testHarness.processElement(new StreamRecord(CRow(0: JInt, 2: JInt, "aaa"), 1))
+    expectedOutput.add(new StreamRecord(CRow(false, "aaa", 1, 1), 1))
+    expectedOutput.add(new StreamRecord(CRow("aaa", 0, 2), 1))
+
+    // retract the min/max value
+    testHarness.processElement(new StreamRecord(CRow(false, 0: JInt, 2: JInt, "aaa"), 1))
+    expectedOutput.add(new StreamRecord(CRow(false, "aaa", 0, 2), 1))
+    expectedOutput.add(new StreamRecord(CRow("aaa", 1, 1), 1))
+
+    // remove some state: state may be cleaned up by the state backend
+    // if not accessed beyond ttl time
+    operator.setCurrentKey(Row.of("aaa"))
+    minState.remove(1)
+    maxState.remove(1)
+
+    // retract after state has been cleaned up
+    testHarness.processElement(new StreamRecord(CRow(false, 2: JInt, 0: JInt, "aaa"), 1))
+
+    testHarness.processElement(new StreamRecord(CRow(false, 1: JInt, 1: JInt, "aaa"), 1))
+    expectedOutput.add(new StreamRecord(CRow(false, "aaa", 1, 1), 1))
+
+    val result = testHarness.getOutput
+
+    verify(expectedOutput, result)
+
+    testHarness.close()
+  }
 }
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
index e4d938d4786..2e9dac5e7e5 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/table/AggregateITCase.scala
@@ -203,13 +203,13 @@ class AggregateITCase extends StreamingWithStateTestBase {
       .groupBy('b)
       .select('a.count as 'cnt, 'b)
       .groupBy('cnt)
-      .select('cnt, 'b.count as 'freq)
+      .select('cnt, 'b.count as 'freq, 'b.min as 'min, 'b.max as 'max)
 
     val results = t.toRetractStream[Row](queryConfig)
 
     results.addSink(new RetractingSink)
     env.execute()
-    val expected = List("1,1", "2,1", "3,1", "4,1", "5,1", "6,1")
+    val expected = List("1,1,1,1", "2,1,2,2", "3,1,3,3", "4,1,4,4", "5,1,5,5", "6,1,6,6")
     assertEquals(expected.sorted, StreamITCase.retractedResults.sorted)
   }
 
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
index cff8f1859c6..fbba03b2a25 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OptimizerNode.java
@@ -1148,7 +1148,7 @@ public String toString() {
 		/**
 		 * Creates a new branching descriptor.
 		 *
-		 * @param branchingNode The node where the branch occurred (teh node with multiple outputs).
+		 * @param branchingNode The node where the branch occurred (the node with multiple outputs).
 		 * @param joinedPathsVector A bit vector describing which branches are tracked by this descriptor.
 		 *                          The bit vector is one, where the branch is tracked, zero otherwise.
 		 */
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
index 59536a16a1a..f4740aceea4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/librarycache/FlinkUserCodeClassLoaders.java
@@ -73,7 +73,7 @@ public static ResolveOrder fromString(String resolveOrder) {
 	}
 
 	/**
-	 * Regular URLClassLoader that first loads from the parent and only after that form the URLs.
+	 * Regular URLClassLoader that first loads from the parent and only after that from the URLs.
 	 */
 	static class ParentFirstClassLoader extends URLClassLoader {
 
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
index ea96d7d43d2..114d2811ef9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperHaServices.java
@@ -44,7 +44,7 @@
 
 /**
  * An implementation of the {@link HighAvailabilityServices} using Apache ZooKeeper.
- * The services store data in ZooKeeper's nodes as illustrated by teh following tree structure:
+ * The services store data in ZooKeeper's nodes as illustrated by the following tree structure:
  * 
  * <pre>
  * /flink
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 24448c7db0c..b8c2cdce29d 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -841,7 +841,7 @@ object AkkaUtils {
     *
     * @param akkaURL The URL to extract the host and port from.
     * @throws java.lang.Exception Thrown, if the given string does not represent a proper url
-    * @return The InetSocketAddress with teh extracted host and port.
+    * @return The InetSocketAddress with the extracted host and port.
     */
   @throws(classOf[Exception])
   def getInetSocketAddressFromAkkaURL(akkaURL: String): InetSocketAddress = {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index a9bd1f694a3..f51ab313871 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -1951,7 +1951,7 @@ public void testListStateMerging() throws Exception {
 
 			// populate the different namespaces
 			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
+			//  - def spreads the values over two namespaces (one empty)
 			//  - ghi is empty
 			//  - jkl has all elements already in the target namespace
 			//  - mno has all elements already in one source namespace
@@ -2223,7 +2223,7 @@ public void testReducingStateMerging() throws Exception {
 
 			// populate the different namespaces
 			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
+			//  - def spreads the values over two namespaces (one empty)
 			//  - ghi is empty
 			//  - jkl has all elements already in the target namespace
 			//  - mno has all elements already in one source namespace
@@ -2396,7 +2396,7 @@ public void testAggregatingStateMergingWithMutableAccumulator() throws Exception
 
 			// populate the different namespaces
 			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
+			//  - def spreads the values over two namespaces (one empty)
 			//  - ghi is empty
 			//  - jkl has all elements already in the target namespace
 			//  - mno has all elements already in one source namespace
@@ -2569,7 +2569,7 @@ public void testAggregatingStateMergingWithImmutableAccumulator() throws Excepti
 
 			// populate the different namespaces
 			//  - abc spreads the values over three namespaces
-			//  - def spreads teh values over two namespaces (one empty)
+			//  - def spreads the values over two namespaces (one empty)
 			//  - ghi is empty
 			//  - jkl has all elements already in the target namespace
 			//  - mno has all elements already in one source namespace
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
index ae95408af44..700043092df 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/BufferSpiller.java
@@ -94,7 +94,7 @@
 	/**
 	 * Creates a new buffer spiller, spilling to one of the I/O manager's temp directories.
 	 *
-	 * @param ioManager The I/O manager for access to teh temp directories.
+	 * @param ioManager The I/O manager for access to the temp directories.
 	 * @param pageSize The page size used to re-create spilled buffers.
 	 * @throws IOException Thrown if the temp files for spilling cannot be initialized.
 	 */
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
index 3f174b05d4a..acda4898394 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamRecordWriter.java
@@ -36,7 +36,7 @@
 @Internal
 public class StreamRecordWriter<T extends IOReadableWritable> extends RecordWriter<T> {
 
-	/** Default name for teh output flush thread, if no name with a task reference is given. */
+	/** Default name for the output flush thread, if no name with a task reference is given. */
 	private static final String DEFAULT_OUTPUT_FLUSH_THREAD_NAME = "OutputFlusher";
 
 
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index b63ad11bf9a..2f6691f486c 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -1652,7 +1652,7 @@ protected ContainerLaunchContext setupApplicationMasterContainer(
 	}
 
 	/**
-	 * Creates a YarnClusterClient; may be overriden in tests.
+	 * Creates a YarnClusterClient; may be overridden in tests.
 	 */
 	protected abstract ClusterClient<ApplicationId> createYarnClusterClient(
 			AbstractYarnClusterDescriptor descriptor,
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index e5311cdd2da..931b22c0373 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -253,7 +253,7 @@ public FlinkYarnSessionCli(
 				yarnApplicationIdFromYarnProperties = ConverterUtils.toApplicationId(yarnApplicationIdString);
 			}
 			catch (Exception e) {
-				throw new FlinkException("YARN properties contains an invalid entry for " +
+				throw new FlinkException("YARN properties contain an invalid entry for " +
 					"application id: " + yarnApplicationIdString + ". Please delete the file at " +
 					yarnPropertiesLocation.getAbsolutePath(), e);
 			}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services