You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2017/03/06 15:52:09 UTC

[1/5] flink git commit: [FLINK-4460] Update doc: ProcessFunction now possible on DataStream

Repository: flink
Updated Branches:
  refs/heads/master 62517ca4b -> 746c1efa5


[FLINK-4460] Update doc: ProcessFunction now possible on DataStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/746c1efa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/746c1efa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/746c1efa

Branch: refs/heads/master
Commit: 746c1efa504267e18ddc6305712d4c3717ba4cb4
Parents: 06740fb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Mar 6 12:24:46 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100

----------------------------------------------------------------------
 docs/dev/stream/process_function.md | 33 +++++++++++++++++---------------
 1 file changed, 18 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/746c1efa/docs/dev/stream/process_function.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/process_function.md b/docs/dev/stream/process_function.md
index 22295be..1f93f68 100644
--- a/docs/dev/stream/process_function.md
+++ b/docs/dev/stream/process_function.md
@@ -32,32 +32,35 @@ The `ProcessFunction` is a low-level stream processing operation, giving access
 all (acyclic) streaming applications:
 
   - events (stream elements)
-  - state (fault tolerant, consistent)
-  - timers (event time and processing time)
+  - state (fault tolerant, consistent, only on keyed stream)
+  - timers (event time and processing time, only on keyed stream)
 
 The `ProcessFunction` can be thought of as a `FlatMapFunction` with access to keyed state and timers. It handles events
 by being invoked for each event received in the input stream(s).
 
 For fault tolerant state, the `ProcessFunction` gives access to Flink's [keyed state](state.html), accessible via the
-`RuntimeContext`, similar to the way other stateful functions can access keyed state. Like all functions with keyed state,
-the `ProcessFunction` needs to be applied onto a `KeyedStream`:
-```java
-stream.keyBy("id").process(new MyProcessFunction())
-```
+`RuntimeContext`, similar to the way other stateful functions can access keyed state.
 
 The timers allow applications to react to changes in processing time and in [event time](../event_time.html).
 Every call to the function `processElement(...)` gets a `Context` object with gives access to the element's
 event time timestamp, and to the *TimerService*. The `TimerService` can be used to register callbacks for future
-event-/processing- time instants. When a timer's particular time is reached, the `onTimer(...)` method is
+event-/processing-time instants. When a timer's particular time is reached, the `onTimer(...)` method is
 called. During that call, all states are again scoped to the key with which the timer was created, allowing
 timers to perform keyed state manipulation as well.
 
+<span class="label label-info">Note</span> If you want to access keyed state and timers you have
+to apply the `ProcessFunction` on a keyed stream:
+
+{% highlight java %}
+stream.keyBy(...).process(new MyProcessFunction())
+{% endhighlight %}
+
 
 ## Low-level Joins
 
-To realize low-level operations on two inputs, applications can use `CoProcessFunction`. It relates to `ProcessFunction`
-in the same way that `CoFlatMapFunction` relates to `FlatMapFunction`: the function is bound to two different inputs and
-gets individual calls to `processElement1(...)` and `processElement2(...)` for records from the two different inputs.
+To realize low-level operations on two inputs, applications can use `CoProcessFunction`. This
+function is bound to two different inputs and gets individual calls to `processElement1(...)` and
+`processElement2(...)` for records from the two different inputs.
 
 Implementing a low level join typically follows this pattern:
 
@@ -82,8 +85,8 @@ The following example maintains counts per key, and emits a key/count pair whene
   - Upon each callback, it checks the callback's event time timestamp against the last-modification time of the stored count
     and emits the key/count if they match (i.e., no further update occurred during that minute)
 
-*Note:* This simple example could have been implemented with session windows. We use `ProcessFunction` here to illustrate
-the basic pattern it provides.
+<span class="label label-info">Note</span> This simple example could have been implemented with
+session windows. We use `ProcessFunction` here to illustrate the basic pattern it provides.
 
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
@@ -207,7 +210,7 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long
     // initialize or retrieve/update the state
 
     val current: CountWithTimestamp = state.value match {
-      case null => 
+      case null =>
         CountWithTimestamp(key, 1, ctx.timestamp)
       case CountWithTimestamp(key, count, time) =>
         CountWithTimestamp(key, count + 1, ctx.timestamp)
@@ -222,7 +225,7 @@ class TimeoutStateFunction extends ProcessFunction[(String, Long), (String, Long
 
   override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
     state.value match {
-      case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) => 
+      case CountWithTimestamp(key, count, lastModified) if (lastModified == timestamp) =>
         out.collect((key, count))
       case _ =>
     }


[3/5] flink git commit: [FLINK-4460] Make ProcessFunction abstract, add default onTime() method

Posted by al...@apache.org.
[FLINK-4460] Make ProcessFunction abstract, add default onTime() method

This is in preparation of allowing ProcessFunction on DataStream because
we will use it to allow side outputs from the ProcessFunction Context.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/82eddcad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/82eddcad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/82eddcad

Branch: refs/heads/master
Commit: 82eddcad9674b09e90db921bc874b43b64796420
Parents: 62517ca
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 10:57:12 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java   |  9 ----
 .../api/functions/ProcessFunction.java          | 43 ++++++++++++--------
 .../api/functions/RichProcessFunction.java      | 40 ------------------
 .../api/operators/ProcessOperator.java          | 24 +++++------
 .../api/operators/ProcessOperatorTest.java      |  9 ++--
 .../flink/streaming/api/scala/KeyedStream.scala |  6 +--
 .../streaming/api/scala/DataStreamTest.scala    |  8 ++--
 7 files changed, 45 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 7f33275..1fcd5bb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -32,7 +32,6 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
-import org.apache.flink.streaming.api.functions.RichProcessFunction;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -181,10 +180,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * function, this function can also query the time and set timers. When reacting to the firing
 	 * of set timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link RichProcessFunction}
-	 * can be used to gain access to features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 *
 	 * @param processFunction The {@link ProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
@@ -216,10 +211,6 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 * function, this function can also query the time and set timers. When reacting to the firing
 	 * of set timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link RichProcessFunction}
-	 * can be used to gain access to features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 *
 	 * @param processFunction The {@link ProcessFunction} that is called for each element
 	 *                      in the stream.
 	 * @param outputType {@link TypeInformation} for the result type of the function.

http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
index fd0a829..48418af 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
@@ -27,22 +27,29 @@ import org.apache.flink.util.Collector;
 /**
  * A function that processes elements of a stream.
  *
- * <p>The function will be called for every element in the input stream and can produce
- * zero or more output. The function can also query the time and set timers. When
- * reacting to the firing of set timers the function can emit yet more elements.
+ * <p>For every element in the input stream {@link #processElement(Object, Context, Collector)}
+ * is invoked. This can produce zero or more elements as output. Implementations can also
+ * query the time and set timers through the provided {@link Context}. For firing timers
+ * {@link #onTimer(long, OnTimerContext, Collector)} will be invoked. This can again produce
+ * zero or more elements as output and register further timers.
  *
- * <p>The function will be called for every element in the input stream and can produce
- * zero or more output elements. Contrary to the
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
- * the time (both event and processing) and set timers, through the provided {@link Context}.
- * When reacting to the firing of set timers the function can directly emit a result, and/or
- * register a timer that will trigger an action in the future.
+ * <p><b>NOTE:</b> Access to keyed state and timers (which are also scoped to a key) is only
+ * available if the {@code ProcessFunction} is applied on a {@code KeyedStream}.
+ *
+ * <p><b>NOTE:</b> A {@code ProcessFunction} is always a
+ * {@link org.apache.flink.api.common.functions.RichFunction}. Therefore, access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} is always available and setup and
+ * teardown methods can be implemented. See
+ * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
  *
  * @param <I> Type of the input elements.
  * @param <O> Type of the output elements.
  */
 @PublicEvolving
-public interface ProcessFunction<I, O> extends Function {
+public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
+
+	private static final long serialVersionUID = 1L;
 
 	/**
 	 * Process one element from the input stream.
@@ -59,7 +66,7 @@ public interface ProcessFunction<I, O> extends Function {
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.
 	 */
-	void processElement(I value, Context ctx, Collector<O> out) throws Exception;
+	public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
 
 	/**
 	 * Called when a timer set using {@link TimerService} fires.
@@ -74,13 +81,13 @@ public interface ProcessFunction<I, O> extends Function {
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.
 	 */
-	void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
+	public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
 
 	/**
 	 * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
 	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
 	 */
-	interface Context {
+	public abstract class Context {
 
 		/**
 		 * Timestamp of the element currently being processed or timestamp of a firing timer.
@@ -88,22 +95,22 @@ public interface ProcessFunction<I, O> extends Function {
 		 * <p>This might be {@code null}, for example if the time characteristic of your program
 		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
 		 */
-		Long timestamp();
+		public abstract Long timestamp();
 
 		/**
 		 * A {@link TimerService} for querying time and registering timers.
 		 */
-		TimerService timerService();
+		public abstract TimerService timerService();
 	}
 
 	/**
 	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
 	 */
-	interface OnTimerContext extends Context {
+	public abstract class OnTimerContext extends Context {
 		/**
 		 * The {@link TimeDomain} of the firing timer.
 		 */
-		TimeDomain timeDomain();
+		public abstract TimeDomain timeDomain();
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
deleted file mode 100644
index 834f717..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Rich variant of the {@link ProcessFunction}. As a
- * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichProcessFunction<I, O>
-		extends AbstractRichFunction
-		implements ProcessFunction<I, O> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 3b13360..7b3617c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -38,11 +38,9 @@ public class ProcessOperator<K, IN, OUT>
 
 	private transient TimestampedCollector<OUT> collector;
 
-	private transient TimerService timerService;
+	private transient ContextImpl<IN, OUT> context;
 
-	private transient ContextImpl<IN> context;
-
-	private transient OnTimerContextImpl onTimerContext;
+	private transient OnTimerContextImpl<IN, OUT> onTimerContext;
 
 	public ProcessOperator(ProcessFunction<IN, OUT> flatMapper) {
 		super(flatMapper);
@@ -58,10 +56,10 @@ public class ProcessOperator<K, IN, OUT>
 		InternalTimerService<VoidNamespace> internalTimerService =
 				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
 
-		this.timerService = new SimpleTimerService(internalTimerService);
+		TimerService timerService = new SimpleTimerService(internalTimerService);
 
-		context = new ContextImpl<>(timerService);
-		onTimerContext = new OnTimerContextImpl(timerService);
+		context = new ContextImpl<>(userFunction, timerService);
+		onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
 	}
 
 	@Override
@@ -92,13 +90,14 @@ public class ProcessOperator<K, IN, OUT>
 		context.element = null;
 	}
 
-	private static class ContextImpl<T> implements ProcessFunction.Context {
+	private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {
 
 		private final TimerService timerService;
 
-		private StreamRecord<T> element;
+		private StreamRecord<IN> element;
 
-		ContextImpl(TimerService timerService) {
+		ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
 
@@ -119,7 +118,7 @@ public class ProcessOperator<K, IN, OUT>
 		}
 	}
 
-	private static class OnTimerContextImpl implements ProcessFunction.OnTimerContext{
+	private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{
 
 		private final TimerService timerService;
 
@@ -127,7 +126,8 @@ public class ProcessOperator<K, IN, OUT>
 
 		private InternalTimer<?, VoidNamespace> timer;
 
-		OnTimerContextImpl(TimerService timerService) {
+		OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index 89d9899..b946646 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.RichProcessFunction;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -286,7 +285,7 @@ public class ProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class QueryingFlatMapFunction implements ProcessFunction<Integer, String> {
+	private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -313,7 +312,7 @@ public class ProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class TriggeringFlatMapFunction implements ProcessFunction<Integer, Integer> {
+	private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -344,7 +343,7 @@ public class ProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class TriggeringStatefulFlatMapFunction extends RichProcessFunction<Integer, String> {
+	private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -378,7 +377,7 @@ public class ProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class BothTriggeringFlatMapFunction implements ProcessFunction<Integer, String> {
+	private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 99936e7..489b571 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.state.{FoldingStateDescriptor, ReducingStateD
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
 import org.apache.flink.streaming.api.datastream.{QueryableStateStream, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
-import org.apache.flink.streaming.api.functions.{ProcessFunction, RichProcessFunction}
+import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -66,10 +66,6 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     * function, this function can also query the time and set timers. When reacting to the firing
     * of set timers the function can directly emit elements and/or register yet more timers.
     *
-    * A [[RichProcessFunction]]
-    * can be used to gain access to features provided by the
-    * [[org.apache.flink.api.common.functions.RichFunction]]
-    *
     * @param processFunction The [[ProcessFunction]] that is called for each element
     *                   in the stream.
     */

http://git-wip-us.apache.org/repos/asf/flink/blob/82eddcad/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 841567a..b546577 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.operators.ResourceSpec
 import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.functions.ProcessFunction.{Context, OnTimerContext}
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
 import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, ProcessOperator, StreamOperator}
@@ -404,10 +403,9 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val src = env.generateSequence(0, 0)
 
     val processFunction = new ProcessFunction[Long, Int] {
-      override def processElement(value: Long, ctx: Context, out: Collector[Int]): Unit = ???
-      override def onTimer(
-          timestamp: Long,
-          ctx: OnTimerContext,
+      override def processElement(
+          value: Long,
+          ctx: ProcessFunction[Long, Int]#Context,
           out: Collector[Int]): Unit = ???
     }
 


[5/5] flink git commit: [FLINK-4660] Allow ProcessFunction on DataStream

Posted by al...@apache.org.
[FLINK-4660] Allow ProcessFunction on DataStream

Introduce new ProcessOperator for this. Rename the pre-existing
ProcessOperator to KeyedProcessOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0228676d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0228676d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0228676d

Branch: refs/heads/master
Commit: 0228676d689b04accf1ba7d7c7f8064e121c6d88
Parents: 82eddca
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 11:41:02 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |  56 +++
 .../streaming/api/datastream/KeyedStream.java   |   8 +-
 .../api/operators/KeyedProcessOperator.java     | 151 +++++++
 .../api/operators/ProcessOperator.java          |  95 ++---
 .../flink/streaming/api/DataStreamTest.java     |  45 ++-
 .../api/operators/KeyedProcessOperatorTest.java | 403 +++++++++++++++++++
 .../api/operators/ProcessOperatorTest.java      | 298 +-------------
 .../flink/streaming/api/scala/DataStream.scala  |  24 +-
 .../flink/streaming/api/scala/KeyedStream.scala |   2 +-
 .../streaming/api/scala/DataStreamTest.scala    |  29 +-
 10 files changed, 747 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index c443758..8fcaf6b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -51,6 +51,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
 import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
 import org.apache.flink.streaming.api.functions.TimestampExtractor;
@@ -59,6 +60,7 @@ import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.sink.SocketClientSink;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamFilter;
 import org.apache.flink.streaming.api.operators.StreamFlatMap;
 import org.apache.flink.streaming.api.operators.StreamMap;
@@ -556,6 +558,60 @@ public class DataStream<T> {
 	}
 
 	/**
+	 * Applies the given {@link ProcessFunction} on the input stream, thereby
+	 * creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements.
+	 *
+	 * @param processFunction The {@link ProcessFunction} that is called for each element
+	 *                      in the stream.
+	 *
+	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
+
+		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
+				processFunction,
+				ProcessFunction.class,
+				false,
+				true,
+				getType(),
+				Utils.getCallLocationName(),
+				true);
+
+		return process(processFunction, outType);
+	}
+
+	/**
+	 * Applies the given {@link ProcessFunction} on the input stream, thereby
+	 * creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements.
+	 *
+	 * @param processFunction The {@link ProcessFunction} that is called for each element
+	 *                      in the stream.
+	 * @param outputType {@link TypeInformation} for the result type of the function.
+	 *
+	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> process(
+			ProcessFunction<T, R> processFunction,
+			TypeInformation<R> outputType) {
+
+		ProcessOperator<T, R> operator = new ProcessOperator<>(clean(processFunction));
+
+		return transform("Process", outputType, operator);
+	}
+
+	/**
 	 * Applies a Filter transformation on a {@link DataStream}. The
 	 * transformation calls a {@link FilterFunction} for each element of the
 	 * DataStream and retains only those element for which the function returns

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 1fcd5bb..7c9f5bc 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -40,7 +40,7 @@ import org.apache.flink.streaming.api.functions.query.QueryableValueStateOperato
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
@@ -187,6 +187,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
+	@Override
 	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
 
@@ -219,13 +220,14 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
+	@Override
 	@Internal
 	public <R> SingleOutputStreamOperator<R> process(
 			ProcessFunction<T, R> processFunction,
 			TypeInformation<R> outputType) {
 
-		ProcessOperator<KEY, T, R> operator =
-				new ProcessOperator<>(clean(processFunction));
+		KeyedProcessOperator<KEY, T, R> operator =
+				new KeyedProcessOperator<>(clean(processFunction));
 
 		return transform("Process", outputType, operator);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
new file mode 100644
index 0000000..d8dfb0f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class KeyedProcessOperator<K, IN, OUT>
+		extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient ContextImpl<IN, OUT> context;
+
+	private transient OnTimerContextImpl<IN, OUT> onTimerContext;
+
+	public KeyedProcessOperator(ProcessFunction<IN, OUT> function) {
+		super(function);
+
+		chainingStrategy = ChainingStrategy.ALWAYS;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+		TimerService timerService = new SimpleTimerService(internalTimerService);
+
+		context = new ContextImpl<>(userFunction, timerService);
+		onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {
+
+		private final TimerService timerService;
+
+		private StreamRecord<IN> element;
+
+		ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+			function.super();
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(element != null);
+
+			if (element.hasTimestamp()) {
+				return element.getTimestamp();
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+
+	private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{
+
+		private final TimerService timerService;
+
+		private TimeDomain timeDomain;
+
+		private InternalTimer<?, VoidNamespace> timer;
+
+		OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+			function.super();
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public TimeDomain timeDomain() {
+			checkState(timeDomain != null);
+			return timeDomain;
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(timer != null);
+			return timer.getTimestamp();
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
index 7b3617c..13b68f4 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -18,32 +18,30 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 @Internal
-public class ProcessOperator<K, IN, OUT>
+public class ProcessOperator<IN, OUT>
 		extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+		implements OneInputStreamOperator<IN, OUT> {
 
 	private static final long serialVersionUID = 1L;
 
 	private transient TimestampedCollector<OUT> collector;
 
-	private transient ContextImpl<IN, OUT> context;
+	private transient ContextImpl context;
 
-	private transient OnTimerContextImpl<IN, OUT> onTimerContext;
+	/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
+	private long currentWatermark = Long.MIN_VALUE;
 
-	public ProcessOperator(ProcessFunction<IN, OUT> flatMapper) {
-		super(flatMapper);
+	public ProcessOperator(ProcessFunction<IN, OUT> function) {
+		super(function);
 
 		chainingStrategy = ChainingStrategy.ALWAYS;
 	}
@@ -53,33 +51,7 @@ public class ProcessOperator<K, IN, OUT>
 		super.open();
 		collector = new TimestampedCollector<>(output);
 
-		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
-		TimerService timerService = new SimpleTimerService(internalTimerService);
-
-		context = new ContextImpl<>(userFunction, timerService);
-		onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
-	}
-
-	@Override
-	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	@Override
-	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
+		context = new ContextImpl(userFunction, getProcessingTimeService());
 	}
 
 	@Override
@@ -90,15 +62,23 @@ public class ProcessOperator<K, IN, OUT>
 		context.element = null;
 	}
 
-	private static class ContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.Context {
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		super.processWatermark(mark);
+		this.currentWatermark = mark.getTimestamp();
+	}
 
-		private final TimerService timerService;
+	private class ContextImpl
+			extends ProcessFunction<IN, OUT>.Context
+			implements TimerService {
 
 		private StreamRecord<IN> element;
 
-		ContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
+		private final ProcessingTimeService processingTimeService;
+
+		ContextImpl(ProcessFunction<IN, OUT> function, ProcessingTimeService processingTimeService) {
 			function.super();
-			this.timerService = checkNotNull(timerService);
+			this.processingTimeService = processingTimeService;
 		}
 
 		@Override
@@ -113,39 +93,28 @@ public class ProcessOperator<K, IN, OUT>
 		}
 
 		@Override
-		public TimerService timerService() {
-			return timerService;
+		public long currentProcessingTime() {
+			return processingTimeService.getCurrentProcessingTime();
 		}
-	}
-
-	private static class OnTimerContextImpl<IN, OUT> extends ProcessFunction<IN, OUT>.OnTimerContext{
-
-		private final TimerService timerService;
 
-		private TimeDomain timeDomain;
-
-		private InternalTimer<?, VoidNamespace> timer;
-
-		OnTimerContextImpl(ProcessFunction<IN, OUT> function, TimerService timerService) {
-			function.super();
-			this.timerService = checkNotNull(timerService);
+		@Override
+		public long currentWatermark() {
+			return currentWatermark;
 		}
 
 		@Override
-		public TimeDomain timeDomain() {
-			checkState(timeDomain != null);
-			return timeDomain;
+		public void registerProcessingTimeTimer(long time) {
+			throw new UnsupportedOperationException("Setting timers is only supported on a KeyedStream.");
 		}
 
 		@Override
-		public Long timestamp() {
-			checkState(timer != null);
-			return timer.getTimestamp();
+		public void registerEventTimeTimer(long time) {
+			throw new UnsupportedOperationException("Setting timers is only supported on a KeyedStream.");
 		}
 
 		@Override
 		public TimerService timerService() {
-			return timerService;
+			return this;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 12af1d4..a619338 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.Partitioner;
-import org.apache.flink.api.common.operators.ResourceSpec;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -47,8 +46,9 @@ import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.ProcessOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -643,7 +643,7 @@ public class DataStreamTest {
 	 * an operator.
 	 */
 	@Test
-	public void testProcessTranslation() {
+	public void testKeyedProcessTranslation() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		DataStreamSource<Long> src = env.generateSequence(0, 0);
 
@@ -674,9 +674,48 @@ public class DataStreamTest {
 		processed.addSink(new DiscardingSink<Integer>());
 
 		assertEquals(processFunction, getFunctionForDataStream(processed));
+		assertTrue(getOperatorForDataStream(processed) instanceof KeyedProcessOperator);
+	}
+
+	/**
+	 * Verify that a {@link DataStream#process(ProcessFunction)} call is correctly translated to
+	 * an operator.
+	 */
+	@Test
+	public void testProcessTranslation() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStreamSource<Long> src = env.generateSequence(0, 0);
+
+		ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void processElement(
+					Long value,
+					Context ctx,
+					Collector<Integer> out) throws Exception {
+
+			}
+
+			@Override
+			public void onTimer(
+					long timestamp,
+					OnTimerContext ctx,
+					Collector<Integer> out) throws Exception {
+
+			}
+		};
+
+		DataStream<Integer> processed = src
+				.process(processFunction);
+
+		processed.addSink(new DiscardingSink<Integer>());
+
+		assertEquals(processFunction, getFunctionForDataStream(processed));
 		assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator);
 	}
 
+
 	@Test
 	public void operatorTest() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
new file mode 100644
index 0000000..32953fc
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/KeyedProcessOperatorTest.java
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link KeyedProcessOperator}.
+ */
+public class KeyedProcessOperatorTest extends TestLogger {
+
+	@Test
+	public void testTimestampAndWatermarkQuerying() throws Exception {
+
+		KeyedProcessOperator<Integer, Integer, String> operator =
+				new KeyedProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(17));
+		testHarness.processElement(new StreamRecord<>(5, 12L));
+
+		testHarness.processWatermark(new Watermark(42));
+		testHarness.processElement(new StreamRecord<>(6, 13L));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(17L));
+		expectedOutput.add(new StreamRecord<>("5TIME:17 TS:12", 12L));
+		expectedOutput.add(new Watermark(42L));
+		expectedOutput.add(new StreamRecord<>("6TIME:42 TS:13", 13L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+		KeyedProcessOperator<Integer, Integer, String> operator =
+				new KeyedProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		testHarness.processElement(new StreamRecord<>(5));
+
+		testHarness.setProcessingTime(42);
+		testHarness.processElement(new StreamRecord<>(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("5TIME:17 TS:null"));
+		expectedOutput.add(new StreamRecord<>("6TIME:42 TS:null"));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testEventTimeTimers() throws Exception {
+
+		KeyedProcessOperator<Integer, Integer, Integer> operator =
+				new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(0));
+
+		testHarness.processElement(new StreamRecord<>(17, 42L));
+
+		testHarness.processWatermark(new Watermark(5));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(0L));
+		expectedOutput.add(new StreamRecord<>(17, 42L));
+		expectedOutput.add(new StreamRecord<>(1777, 5L));
+		expectedOutput.add(new Watermark(5L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTimers() throws Exception {
+
+		KeyedProcessOperator<Integer, Integer, Integer> operator =
+				new KeyedProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(17));
+
+		testHarness.setProcessingTime(5);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>(17));
+		expectedOutput.add(new StreamRecord<>(1777, 5L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testEventTimeTimerWithState() throws Exception {
+
+		KeyedProcessOperator<Integer, Integer, String> operator =
+				new KeyedProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark(new Watermark(1));
+		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+		testHarness.processWatermark(new Watermark(2));
+		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
+
+		testHarness.processWatermark(new Watermark(6));
+		testHarness.processWatermark(new Watermark(7));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(1L));
+		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
+		expectedOutput.add(new Watermark(2L));
+		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new Watermark(6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new Watermark(7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testProcessingTimeTimerWithState() throws Exception {
+
+		KeyedProcessOperator<Integer, Integer, String> operator =
+				new KeyedProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(1);
+		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
+
+		testHarness.setProcessingTime(2);
+		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
+
+		testHarness.setProcessingTime(6);
+		testHarness.setProcessingTime(7);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT:42"));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+
+		KeyedProcessOperator<Integer, Integer, String> operator =
+				new KeyedProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
+				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<>(5, 12L));
+
+		// snapshot and restore from scratch
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+		testHarness.close();
+
+		operator = new KeyedProcessOperator<>(new BothTriggeringFlatMapFunction());
+
+		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.setProcessingTime(5);
+		testHarness.processWatermark(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+		expectedOutput.add(new Watermark(6));
+
+		System.out.println("GOT: " + testHarness.getOutput());
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final TimeDomain timeDomain;
+
+		public QueryingFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				out.collect(value + "TIME:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+			} else {
+				out.collect(value + "TIME:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final TimeDomain timeDomain;
+
+		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
+			out.collect(value);
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+			} else {
+				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<Integer> out) throws Exception {
+
+			assertEquals(this.timeDomain, ctx.timeDomain());
+			out.collect(1777);
+		}
+	}
+
+	private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<Integer> state =
+				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
+
+		private final TimeDomain timeDomain;
+
+		public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
+			this.timeDomain = timeDomain;
+		}
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT:" + value);
+			getRuntimeContext().getState(state).update(value);
+			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
+				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+			} else {
+				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+			}
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			assertEquals(this.timeDomain, ctx.timeDomain());
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
+			ctx.timerService().registerProcessingTimeTimer(5);
+			ctx.timerService().registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+				out.collect("EVENT:1777");
+			} else {
+				out.collect("PROC:1777");
+			}
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
index b946646..c37fe48 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/ProcessOperatorTest.java
@@ -18,16 +18,10 @@
 package org.apache.flink.streaming.api.operators;
 
 
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.IntSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
@@ -36,8 +30,6 @@ import org.junit.Test;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 
-import static org.junit.Assert.assertEquals;
-
 /**
  * Tests {@link ProcessOperator}.
  */
@@ -46,11 +38,11 @@ public class ProcessOperatorTest extends TestLogger {
 	@Test
 	public void testTimestampAndWatermarkQuerying() throws Exception {
 
-		ProcessOperator<Integer, Integer, String> operator =
-				new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+		ProcessOperator<Integer, String> operator =
+				new ProcessOperator<>(new QueryingProcessFunction(TimeDomain.EVENT_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.setup();
 		testHarness.open();
@@ -76,11 +68,11 @@ public class ProcessOperatorTest extends TestLogger {
 	@Test
 	public void testTimestampAndProcessingTimeQuerying() throws Exception {
 
-		ProcessOperator<Integer, Integer, String> operator =
-				new ProcessOperator<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+		ProcessOperator<Integer, String> operator =
+				new ProcessOperator<>(new QueryingProcessFunction(TimeDomain.PROCESSING_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
+				new OneInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.setup();
 		testHarness.open();
@@ -100,198 +92,13 @@ public class ProcessOperatorTest extends TestLogger {
 
 		testHarness.close();
 	}
-
-	@Test
-	public void testEventTimeTimers() throws Exception {
-
-		ProcessOperator<Integer, Integer, Integer> operator =
-				new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processWatermark(new Watermark(0));
-
-		testHarness.processElement(new StreamRecord<>(17, 42L));
-
-		testHarness.processWatermark(new Watermark(5));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new Watermark(0L));
-		expectedOutput.add(new StreamRecord<>(17, 42L));
-		expectedOutput.add(new StreamRecord<>(1777, 5L));
-		expectedOutput.add(new Watermark(5L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testProcessingTimeTimers() throws Exception {
-
-		ProcessOperator<Integer, Integer, Integer> operator =
-				new ProcessOperator<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(17));
-
-		testHarness.setProcessingTime(5);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>(17));
-		expectedOutput.add(new StreamRecord<>(1777, 5L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Verifies that we don't have leakage between different keys.
-	 */
-	@Test
-	public void testEventTimeTimerWithState() throws Exception {
-
-		ProcessOperator<Integer, Integer, String> operator =
-				new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processWatermark(new Watermark(1));
-		testHarness.processElement(new StreamRecord<>(17, 0L)); // should set timer for 6
-
-		testHarness.processWatermark(new Watermark(2));
-		testHarness.processElement(new StreamRecord<>(42, 1L)); // should set timer for 7
-
-		testHarness.processWatermark(new Watermark(6));
-		testHarness.processWatermark(new Watermark(7));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new Watermark(1L));
-		expectedOutput.add(new StreamRecord<>("INPUT:17", 0L));
-		expectedOutput.add(new Watermark(2L));
-		expectedOutput.add(new StreamRecord<>("INPUT:42", 1L));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new Watermark(6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-		expectedOutput.add(new Watermark(7L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Verifies that we don't have leakage between different keys.
-	 */
-	@Test
-	public void testProcessingTimeTimerWithState() throws Exception {
-
-		ProcessOperator<Integer, Integer, String> operator =
-				new ProcessOperator<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.setProcessingTime(1);
-		testHarness.processElement(new StreamRecord<>(17)); // should set timer for 6
-
-		testHarness.setProcessingTime(2);
-		testHarness.processElement(new StreamRecord<>(42)); // should set timer for 7
-
-		testHarness.setProcessingTime(6);
-		testHarness.setProcessingTime(7);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("INPUT:17"));
-		expectedOutput.add(new StreamRecord<>("INPUT:42"));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testSnapshotAndRestore() throws Exception {
-
-		ProcessOperator<Integer, Integer, String> operator =
-				new ProcessOperator<>(new BothTriggeringFlatMapFunction());
-
-		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
-				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement(new StreamRecord<>(5, 12L));
-
-		// snapshot and restore from scratch
-		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
-		testHarness.close();
-
-		operator = new ProcessOperator<>(new BothTriggeringFlatMapFunction());
-
-		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.open();
-
-		testHarness.setProcessingTime(5);
-		testHarness.processWatermark(new Watermark(6));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
-		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
-		expectedOutput.add(new Watermark(6));
-
-		System.out.println("GOT: " + testHarness.getOutput());
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public T getKey(T value) throws Exception {
-			return value;
-		}
-	}
-
-	private static class QueryingFlatMapFunction extends ProcessFunction<Integer, String> {
+	private static class QueryingProcessFunction extends ProcessFunction<Integer, String> {
 
 		private static final long serialVersionUID = 1L;
 
 		private final TimeDomain timeDomain;
 
-		public QueryingFlatMapFunction(TimeDomain timeDomain) {
+		public QueryingProcessFunction(TimeDomain timeDomain) {
 			this.timeDomain = timeDomain;
 		}
 
@@ -311,93 +118,4 @@ public class ProcessOperatorTest extends TestLogger {
 				Collector<String> out) throws Exception {
 		}
 	}
-
-	private static class TriggeringFlatMapFunction extends ProcessFunction<Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final TimeDomain timeDomain;
-
-		public TriggeringFlatMapFunction(TimeDomain timeDomain) {
-			this.timeDomain = timeDomain;
-		}
-
-		@Override
-		public void processElement(Integer value, Context ctx, Collector<Integer> out) throws Exception {
-			out.collect(value);
-			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
-			} else {
-				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
-			}
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<Integer> out) throws Exception {
-
-			assertEquals(this.timeDomain, ctx.timeDomain());
-			out.collect(1777);
-		}
-	}
-
-	private static class TriggeringStatefulFlatMapFunction extends ProcessFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final ValueStateDescriptor<Integer> state =
-				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE);
-
-		private final TimeDomain timeDomain;
-
-		public TriggeringStatefulFlatMapFunction(TimeDomain timeDomain) {
-			this.timeDomain = timeDomain;
-		}
-
-		@Override
-		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT:" + value);
-			getRuntimeContext().getState(state).update(value);
-			if (timeDomain.equals(TimeDomain.EVENT_TIME)) {
-				ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
-			} else {
-				ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
-			}
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			assertEquals(this.timeDomain, ctx.timeDomain());
-			out.collect("STATE:" + getRuntimeContext().getState(state).value());
-		}
-	}
-
-	private static class BothTriggeringFlatMapFunction extends ProcessFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void processElement(Integer value, Context ctx, Collector<String> out) throws Exception {
-			ctx.timerService().registerProcessingTimeTimer(5);
-			ctx.timerService().registerEventTimeTimer(6);
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
-				out.collect("EVENT:1777");
-			} else {
-				out.collect("PROC:1777");
-			}
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 35e1f23..084d389 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -33,7 +33,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.datastream.{AllWindowedStream => JavaAllWindowedStream, DataStream => JavaStream, KeyedStream => JavaKeyedStream, _}
 import org.apache.flink.streaming.api.functions.sink.SinkFunction
 import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor
-import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, TimestampExtractor}
+import org.apache.flink.streaming.api.functions.{AssignerWithPeriodicWatermarks, AssignerWithPunctuatedWatermarks, ProcessFunction, TimestampExtractor}
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator
 import org.apache.flink.streaming.api.windowing.assigners._
 import org.apache.flink.streaming.api.windowing.time.Time
@@ -619,6 +619,28 @@ class DataStream[T](stream: JavaStream[T]) {
   }
 
   /**
+   * Applies the given [[ProcessFunction]] on the input stream, thereby
+   * creating a transformed output stream.
+   *
+   * The function will be called for every element in the stream and can produce
+   * zero or more output.
+   *
+   * @param processFunction The [[ProcessFunction]] that is called for each element
+   *                   in the stream.
+   */
+  @PublicEvolving
+  def process[R: TypeInformation](
+      processFunction: ProcessFunction[T, R]): DataStream[R] = {
+
+    if (processFunction == null) {
+      throw new NullPointerException("ProcessFunction must not be null.")
+    }
+
+    asScalaStream(javaStream.process(processFunction, implicitly[TypeInformation[R]]))
+  }
+
+
+  /**
    * Creates a new DataStream that contains only the elements satisfying the given filter predicate.
    */
   def filter(filter: FilterFunction[T]): DataStream[T] = {

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 489b571..d5ef89f 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -70,7 +70,7 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
     *                   in the stream.
     */
   @PublicEvolving
-  def process[R: TypeInformation](
+  override def process[R: TypeInformation](
     processFunction: ProcessFunction[T, R]): DataStream[R] = {
 
     if (processFunction == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0228676d/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index b546577..60c609d 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -27,7 +27,7 @@ import org.apache.flink.streaming.api.collector.selector.OutputSelector
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, ProcessOperator, StreamOperator}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, KeyedProcessOperator, ProcessOperator, StreamOperator}
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -397,7 +397,7 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
    * Verify that a [[KeyedStream.process()]] call is correctly translated to an operator.
    */
   @Test
-  def testProcessTranslation(): Unit = {
+  def testKeyedProcessTranslation(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 
     val src = env.generateSequence(0, 0)
@@ -412,9 +412,32 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     val flatMapped = src.keyBy(x => x).process(processFunction)
 
     assert(processFunction == getFunctionForDataStream(flatMapped))
-    assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _, _]])
+    assert(getOperatorForDataStream(flatMapped).isInstanceOf[KeyedProcessOperator[_, _, _]])
+  }
+
+  /**
+    * Verify that a [[DataStream.process()]] call is correctly translated to an operator.
+    */
+  @Test
+  def testProcessTranslation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val src = env.generateSequence(0, 0)
+
+    val processFunction = new ProcessFunction[Long, Int] {
+      override def processElement(
+          value: Long,
+          ctx: ProcessFunction[Long, Int]#Context,
+          out: Collector[Int]): Unit = ???
+    }
+
+    val flatMapped = src.process(processFunction)
+
+    assert(processFunction == getFunctionForDataStream(flatMapped))
+    assert(getOperatorForDataStream(flatMapped).isInstanceOf[ProcessOperator[_, _]])
   }
 
+
   @Test def operatorTest() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
 


[2/5] flink git commit: [FLINK-4460] Make CoProcessFunction abstract, add default onTime() method

Posted by al...@apache.org.
[FLINK-4460] Make CoProcessFunction abstract, add default onTime() method

This is in preparation of allowing CoProcessFunction on a non-keyed
connected stream.  we will use it to allow side outputs from the
ProcessFunction Context.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e12f3203
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e12f3203
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e12f3203

Branch: refs/heads/master
Commit: e12f3203eda7f071daeea7c59ca32b3db7763b68
Parents: 0228676
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 11:33:03 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedStreams.java        |  9 -----
 .../api/functions/co/CoProcessFunction.java     | 24 ++++++------
 .../api/functions/co/RichCoProcessFunction.java | 41 --------------------
 .../api/operators/co/CoProcessOperator.java     | 23 +++++------
 .../api/operators/co/CoProcessOperatorTest.java | 15 ++++---
 .../streaming/api/scala/ConnectedStreams.scala  |  2 +-
 6 files changed, 32 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 96a08d3..66601a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
@@ -243,10 +242,6 @@ public class ConnectedStreams<IN1, IN2> {
 	 * function can also query the time and set timers. When reacting to the firing of set timers
 	 * the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link RichCoProcessFunction}
-	 * can be used to gain access to features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 *
 	 * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
@@ -274,10 +269,6 @@ public class ConnectedStreams<IN1, IN2> {
 	 * this function can also query the time and set timers. When reacting to the firing of set
 	 * timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link RichCoProcessFunction}
-	 * can be used to gain access to features provided by the
-	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 *
 	 * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index feff8fb..8811e32 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -19,13 +19,11 @@
 package org.apache.flink.streaming.api.functions.co;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.util.Collector;
 
-import java.io.Serializable;
-
 /**
  * A function that processes elements of two streams and produces a single output one.
  *
@@ -46,7 +44,9 @@ import java.io.Serializable;
  * @param <OUT> Output type.
  */
 @PublicEvolving
-public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable {
+public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunction {
+
+	private static final long serialVersionUID = 1L;
 
 	/**
 	 * This method is called for each element in the first of the connected streams.
@@ -63,7 +63,7 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
 	 * @throws Exception The function may throw exceptions which cause the streaming program
 	 *                   to fail and go into recovery.
 	 */
-	void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
+	public abstract void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
 
 	/**
 	 * This method is called for each element in the second of the connected streams.
@@ -80,7 +80,7 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
 	 * @throws Exception The function may throw exceptions which cause the streaming program
 	 *                   to fail and go into recovery.
 	 */
-	void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+	public abstract void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
 
 	/**
 	 * Called when a timer set using {@link TimerService} fires.
@@ -95,14 +95,14 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
 	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
 	 *                   to fail and may trigger recovery.
 	 */
-	void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
+	public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception {}
 
 	/**
 	 * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/
 	 * {@link #processElement2(Object, Context, Collector)}
 	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
 	 */
-	interface Context {
+	public abstract class Context {
 
 		/**
 		 * Timestamp of the element currently being processed or timestamp of a firing timer.
@@ -110,21 +110,21 @@ public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable
 		 * <p>This might be {@code null}, for example if the time characteristic of your program
 		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
 		 */
-		Long timestamp();
+		public abstract Long timestamp();
 
 		/**
 		 * A {@link TimerService} for querying time and registering timers.
 		 */
-		TimerService timerService();
+		public abstract TimerService timerService();
 	}
 
 	/**
 	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
 	 */
-	interface OnTimerContext extends Context {
+	public abstract class OnTimerContext extends Context {
 		/**
 		 * The {@link TimeDomain} of the firing timer.
 		 */
-		TimeDomain timeDomain();
+		public abstract TimeDomain timeDomain();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
deleted file mode 100644
index 0fcea91..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * Rich variant of the {@link CoProcessFunction}. As a {@link RichFunction}, it gives
- * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
- * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link RichFunction#close()}.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichCoProcessFunction<IN1, IN2, OUT>
-		extends AbstractRichFunction
-		implements CoProcessFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index e6c3d3f..ed99815 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -44,11 +44,9 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 
 	private transient TimestampedCollector<OUT> collector;
 
-	private transient TimerService timerService;
+	private transient ContextImpl<IN1, IN2, OUT> context;
 
-	private transient ContextImpl context;
-
-	private transient OnTimerContextImpl onTimerContext;
+	private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;
 
 	public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
@@ -62,10 +60,10 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 		InternalTimerService<VoidNamespace> internalTimerService =
 				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
 
-		this.timerService = new SimpleTimerService(internalTimerService);
+		TimerService timerService = new SimpleTimerService(internalTimerService);
 
-		context = new ContextImpl(timerService);
-		onTimerContext = new OnTimerContextImpl(timerService);
+		context = new ContextImpl<>(userFunction, timerService);
+		onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
 	}
 
 	@Override
@@ -108,13 +106,14 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 		return collector;
 	}
 
-	private static class ContextImpl implements CoProcessFunction.Context {
+	private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
 
 		private final TimerService timerService;
 
 		private StreamRecord<?> element;
 
-		ContextImpl(TimerService timerService) {
+		ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
 
@@ -135,7 +134,8 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 		}
 	}
 
-	private static class OnTimerContextImpl implements CoProcessFunction.OnTimerContext {
+	private static class OnTimerContextImpl<IN1, IN2, OUT>
+			extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
 
 		private final TimerService timerService;
 
@@ -143,7 +143,8 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 
 		private InternalTimer<?, VoidNamespace> timer;
 
-		OnTimerContextImpl(TimerService timerService) {
+		OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index eea428f..9d7c444 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -344,7 +343,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class WatermarkQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -366,7 +365,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class EventTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class EventTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -393,7 +392,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class EventTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+	private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -425,7 +424,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class ProcessingTimeTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -452,7 +451,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class ProcessingTimeQueryingProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -474,7 +473,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class ProcessingTimeTriggeringStatefulProcessFunction extends RichCoProcessFunction<Integer, String, String> {
+	private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -506,7 +505,7 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class BothTriggeringProcessFunction implements CoProcessFunction<Integer, String, String> {
+	private static class BothTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e12f3203/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index a7325a4..cebda5d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction, RichCoProcessFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, CoProcessFunction}
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.util.Collector
 


[4/5] flink git commit: [FLINK-4660] Allow CoProcessFunction on non-keyed ConnectedStreams

Posted by al...@apache.org.
[FLINK-4660] Allow CoProcessFunction on non-keyed ConnectedStreams

Introduce new CoProcessOperator for this. Rename the pre-existing
CoProcessOperator to KeyedCoProcessOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06740fb2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06740fb2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06740fb2

Branch: refs/heads/master
Commit: 06740fb28d3f3269d040d68cbfefc5201203b60d
Parents: e12f320
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Mar 1 12:02:34 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Mar 6 12:26:16 2017 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedStreams.java        |  15 +-
 .../api/operators/co/CoProcessOperator.java     |  89 +--
 .../operators/co/KeyedCoProcessOperator.java    | 168 ++++++
 .../api/operators/co/CoProcessOperatorTest.java | 401 +-------------
 .../co/KeyedCoProcessOperatorTest.java          | 535 +++++++++++++++++++
 5 files changed, 742 insertions(+), 466 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 66601a7..9fe3a4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -29,9 +29,10 @@ import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
+import org.apache.flink.streaming.api.operators.co.KeyedCoProcessOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 
 import static java.util.Objects.requireNonNull;
@@ -281,13 +282,13 @@ public class ConnectedStreams<IN1, IN2> {
 			CoProcessFunction<IN1, IN2, R> coProcessFunction,
 			TypeInformation<R> outputType) {
 
-		if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) {
-			throw new UnsupportedOperationException("A CoProcessFunction can only be applied" +
-					"when both input streams are keyed.");
-		}
+		TwoInputStreamOperator<IN1, IN2, R> operator;
 
-		CoProcessOperator<Object, IN1, IN2, R> operator = new CoProcessOperator<>(
-				inputStream1.clean(coProcessFunction));
+		if ((inputStream1 instanceof KeyedStream) && (inputStream2 instanceof KeyedStream)) {
+			operator = new KeyedCoProcessOperator<>(inputStream1.clean(coProcessFunction));
+		} else {
+			operator = new CoProcessOperator<>(inputStream1.clean(coProcessFunction));
+		}
 
 		return transform("Co-Process", outputType, operator);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
index ed99815..4133e7b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -18,35 +18,32 @@
 package org.apache.flink.streaming.api.operators.co;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.TimerService;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
 import org.apache.flink.streaming.api.operators.InternalTimerService;
 import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
 @Internal
-public class CoProcessOperator<K, IN1, IN2, OUT>
+public class CoProcessOperator<IN1, IN2, OUT>
 		extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
-		implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+		implements TwoInputStreamOperator<IN1, IN2, OUT> {
 
 	private static final long serialVersionUID = 1L;
 
 	private transient TimestampedCollector<OUT> collector;
 
-	private transient ContextImpl<IN1, IN2, OUT> context;
+	private transient ContextImpl context;
 
-	private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;
+	/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
+	private long currentWatermark = Long.MIN_VALUE;
 
 	public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
@@ -57,13 +54,7 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 		super.open();
 		collector = new TimestampedCollector<>(output);
 
-		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
-		TimerService timerService = new SimpleTimerService(internalTimerService);
-
-		context = new ContextImpl<>(userFunction, timerService);
-		onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
+		context = new ContextImpl(userFunction, getProcessingTimeService());
 	}
 
 	@Override
@@ -83,36 +74,20 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 	}
 
 	@Override
-	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
+	public void processWatermark(Watermark mark) throws Exception {
+		super.processWatermark(mark);
+		currentWatermark = mark.getTimestamp();
 	}
 
-	@Override
-	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
+	private class ContextImpl
+			extends CoProcessFunction<IN1, IN2, OUT>.Context
+			implements TimerService {
 
-	protected TimestampedCollector<OUT> getCollector() {
-		return collector;
-	}
-
-	private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
-
-		private final TimerService timerService;
+		private final ProcessingTimeService timerService;
 
 		private StreamRecord<?> element;
 
-		ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+		ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, ProcessingTimeService timerService) {
 			function.super();
 			this.timerService = checkNotNull(timerService);
 		}
@@ -129,40 +104,28 @@ public class CoProcessOperator<K, IN1, IN2, OUT>
 		}
 
 		@Override
-		public TimerService timerService() {
-			return timerService;
+		public long currentProcessingTime() {
+			return timerService.getCurrentProcessingTime();
 		}
-	}
 
-	private static class OnTimerContextImpl<IN1, IN2, OUT>
-			extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
-
-		private final TimerService timerService;
-
-		private TimeDomain timeDomain;
-
-		private InternalTimer<?, VoidNamespace> timer;
-
-		OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
-			function.super();
-			this.timerService = checkNotNull(timerService);
+		@Override
+		public long currentWatermark() {
+			return currentWatermark;
 		}
 
 		@Override
-		public TimeDomain timeDomain() {
-			checkState(timeDomain != null);
-			return timeDomain;
+		public void registerProcessingTimeTimer(long time) {
+			throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
 		}
 
 		@Override
-		public Long timestamp() {
-			checkState(timer != null);
-			return timer.getTimestamp();
+		public void registerEventTimeTimer(long time) {
+			throw new UnsupportedOperationException("Setting timers is only supported on a keyed streams.");
 		}
 
 		@Override
 		public TimerService timerService() {
-			return timerService;
+			return this;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
new file mode 100644
index 0000000..e721ab8
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperator.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class KeyedCoProcessOperator<K, IN1, IN2, OUT>
+		extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
+		implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient ContextImpl<IN1, IN2, OUT> context;
+
+	private transient OnTimerContextImpl<IN1, IN2, OUT> onTimerContext;
+
+	public KeyedCoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
+		super(flatMapper);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+		TimerService timerService = new SimpleTimerService(internalTimerService);
+
+		context = new ContextImpl<>(userFunction, timerService);
+		onTimerContext = new OnTimerContextImpl<>(userFunction, timerService);
+	}
+
+	@Override
+	public void processElement1(StreamRecord<IN1> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement1(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	@Override
+	public void processElement2(StreamRecord<IN2> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement2(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	protected TimestampedCollector<OUT> getCollector() {
+		return collector;
+	}
+
+	private static class ContextImpl<IN1, IN2, OUT> extends CoProcessFunction<IN1, IN2, OUT>.Context {
+
+		private final TimerService timerService;
+
+		private StreamRecord<?> element;
+
+		ContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+			function.super();
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(element != null);
+
+			if (element.hasTimestamp()) {
+				return element.getTimestamp();
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+
+	private static class OnTimerContextImpl<IN1, IN2, OUT>
+			extends CoProcessFunction<IN1, IN2, OUT>.OnTimerContext {
+
+		private final TimerService timerService;
+
+		private TimeDomain timeDomain;
+
+		private InternalTimer<?, VoidNamespace> timer;
+
+		OnTimerContextImpl(CoProcessFunction<IN1, IN2, OUT> function, TimerService timerService) {
+			function.super();
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public TimeDomain timeDomain() {
+			checkState(timeDomain != null);
+			return timeDomain;
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(timer != null);
+			return timer.getTimestamp();
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
index 9d7c444..c19eb37 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoProcessOperatorTest.java
@@ -18,18 +18,12 @@
 package org.apache.flink.streaming.api.operators.co;
 
 
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.base.StringSerializer;
-import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.TimeDomain;
 import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
-import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
-import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -46,15 +40,11 @@ public class CoProcessOperatorTest extends TestLogger {
 	@Test
 	public void testTimestampAndWatermarkQuerying() throws Exception {
 
-		CoProcessOperator<String, Integer, String, String> operator =
+		CoProcessOperator<Integer, String, String> operator =
 				new CoProcessOperator<>(new WatermarkQueryingProcessFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
+				new TwoInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.setup();
 		testHarness.open();
@@ -82,15 +72,11 @@ public class CoProcessOperatorTest extends TestLogger {
 	@Test
 	public void testTimestampAndProcessingTimeQuerying() throws Exception {
 
-		CoProcessOperator<String, Integer, String, String> operator =
+		CoProcessOperator<Integer, String, String> operator =
 				new CoProcessOperator<>(new ProcessingTimeQueryingProcessFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
+				new TwoInputStreamOperatorTestHarness<>(operator);
 
 		testHarness.setup();
 		testHarness.open();
@@ -111,237 +97,6 @@ public class CoProcessOperatorTest extends TestLogger {
 		testHarness.close();
 	}
 
-	@Test
-	public void testEventTimeTimers() throws Exception {
-
-		CoProcessOperator<String, Integer, String, String> operator =
-				new CoProcessOperator<>(new EventTimeTriggeringProcessFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<>(17, 42L));
-		testHarness.processElement2(new StreamRecord<>("18", 42L));
-
-		testHarness.processWatermark1(new Watermark(5));
-		testHarness.processWatermark2(new Watermark(5));
-
-		testHarness.processWatermark1(new Watermark(6));
-		testHarness.processWatermark2(new Watermark(6));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
-		expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
-		expectedOutput.add(new StreamRecord<>("1777", 5L));
-		expectedOutput.add(new Watermark(5L));
-		expectedOutput.add(new StreamRecord<>("1777", 6L));
-		expectedOutput.add(new Watermark(6L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testProcessingTimeTimers() throws Exception {
-
-		CoProcessOperator<String, Integer, String, String> operator =
-				new CoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<>(17));
-		testHarness.processElement2(new StreamRecord<>("18"));
-
-		testHarness.setProcessingTime(5);
-		testHarness.setProcessingTime(6);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
-		expectedOutput.add(new StreamRecord<>("INPUT2:18"));
-		expectedOutput.add(new StreamRecord<>("1777", 5L));
-		expectedOutput.add(new StreamRecord<>("1777", 6L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Verifies that we don't have leakage between different keys.
-	 */
-	@Test
-	public void testEventTimeTimerWithState() throws Exception {
-
-		CoProcessOperator<String, Integer, String, String> operator =
-				new CoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processWatermark1(new Watermark(1));
-		testHarness.processWatermark2(new Watermark(1));
-		testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
-
-		testHarness.processWatermark1(new Watermark(2));
-		testHarness.processWatermark2(new Watermark(2));
-		testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
-
-		testHarness.processWatermark1(new Watermark(6));
-		testHarness.processWatermark2(new Watermark(6));
-
-		testHarness.processWatermark1(new Watermark(7));
-		testHarness.processWatermark2(new Watermark(7));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new Watermark(1L));
-		expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
-		expectedOutput.add(new Watermark(2L));
-		expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new Watermark(6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-		expectedOutput.add(new Watermark(7L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	/**
-	 * Verifies that we don't have leakage between different keys.
-	 */
-	@Test
-	public void testProcessingTimeTimerWithState() throws Exception {
-
-		CoProcessOperator<String, Integer, String, String> operator =
-				new CoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.setProcessingTime(1);
-		testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
-
-		testHarness.setProcessingTime(2);
-		testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
-
-		testHarness.setProcessingTime(6);
-		testHarness.setProcessingTime(7);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
-		expectedOutput.add(new StreamRecord<>("INPUT2:42"));
-		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
-		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-	@Test
-	public void testSnapshotAndRestore() throws Exception {
-
-		CoProcessOperator<String, Integer, String, String> operator =
-				new CoProcessOperator<>(new BothTriggeringProcessFunction());
-
-		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
-				new KeyedTwoInputStreamOperatorTestHarness<>(
-						operator,
-						new IntToStringKeySelector<>(),
-						new IdentityKeySelector<String>(),
-						BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.open();
-
-		testHarness.processElement1(new StreamRecord<>(5, 12L));
-		testHarness.processElement2(new StreamRecord<>("5", 12L));
-
-		// snapshot and restore from scratch
-		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
-
-		testHarness.close();
-
-		operator = new CoProcessOperator<>(new BothTriggeringProcessFunction());
-
-		testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
-				operator,
-				new IntToStringKeySelector<>(),
-				new IdentityKeySelector<String>(),
-				BasicTypeInfo.STRING_TYPE_INFO);
-
-		testHarness.setup();
-		testHarness.initializeState(snapshot);
-		testHarness.open();
-
-		testHarness.setProcessingTime(5);
-		testHarness.processWatermark1(new Watermark(6));
-		testHarness.processWatermark2(new Watermark(6));
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
-
-		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
-		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
-		expectedOutput.add(new Watermark(6));
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.close();
-	}
-
-
-	private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Integer value) throws Exception {
-			return "" + value;
-		}
-	}
-
-	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public T getKey(T value) throws Exception {
-			return value;
-		}
-	}
 
 	private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
 
@@ -365,92 +120,6 @@ public class CoProcessOperatorTest extends TestLogger {
 		}
 	}
 
-	private static class EventTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT1:" + value);
-			ctx.timerService().registerEventTimeTimer(5);
-		}
-
-		@Override
-		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT2:" + value);
-			ctx.timerService().registerEventTimeTimer(6);
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-
-			assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
-			out.collect("" + 1777);
-		}
-	}
-
-	private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final ValueStateDescriptor<String> state =
-				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
-
-		@Override
-		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT1:" + value);
-			getRuntimeContext().getState(state).update("" + value);
-			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
-		}
-
-		@Override
-		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT2:" + value);
-			getRuntimeContext().getState(state).update(value);
-			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
-		}
-
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
-			out.collect("STATE:" + getRuntimeContext().getState(state).value());
-		}
-	}
-
-	private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT1:" + value);
-			ctx.timerService().registerProcessingTimeTimer(5);
-		}
-
-		@Override
-		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT2:" + value);
-			ctx.timerService().registerProcessingTimeTimer(6);
-		}
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-
-			assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
-			out.collect("" + 1777);
-		}
-	}
-
 	private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
 
 		private static final long serialVersionUID = 1L;
@@ -472,64 +141,4 @@ public class CoProcessOperatorTest extends TestLogger {
 				Collector<String> out) throws Exception {
 		}
 	}
-
-	private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final ValueStateDescriptor<String> state =
-				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
-
-		@Override
-		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT1:" + value);
-			getRuntimeContext().getState(state).update("" + value);
-			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
-		}
-
-		@Override
-		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
-			out.collect("INPUT2:" + value);
-			getRuntimeContext().getState(state).update(value);
-			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
-		}
-
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
-			out.collect("STATE:" + getRuntimeContext().getState(state).value());
-		}
-	}
-
-	private static class BothTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
-			ctx.timerService().registerEventTimeTimer(6);
-		}
-
-		@Override
-		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
-			ctx.timerService().registerProcessingTimeTimer(5);
-		}
-
-
-		@Override
-		public void onTimer(
-				long timestamp,
-				OnTimerContext ctx,
-				Collector<String> out) throws Exception {
-			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
-				out.collect("EVENT:1777");
-			} else {
-				out.collect("PROC:1777");
-			}
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/06740fb2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
new file mode 100644
index 0000000..d8c9a61
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/KeyedCoProcessOperatorTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators.co;
+
+
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
+import org.apache.flink.streaming.util.KeyedTwoInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests {@link KeyedCoProcessOperator}.
+ */
+public class KeyedCoProcessOperatorTest extends TestLogger {
+
+	@Test
+	public void testTimestampAndWatermarkQuerying() throws Exception {
+
+		KeyedCoProcessOperator<String, Integer, String, String> operator =
+				new KeyedCoProcessOperator<>(new WatermarkQueryingProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark1(new Watermark(17));
+		testHarness.processWatermark2(new Watermark(17));
+		testHarness.processElement1(new StreamRecord<>(5, 12L));
+
+		testHarness.processWatermark1(new Watermark(42));
+		testHarness.processWatermark2(new Watermark(42));
+		testHarness.processElement2(new StreamRecord<>("6", 13L));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(17L));
+		expectedOutput.add(new StreamRecord<>("5WM:17 TS:12", 12L));
+		expectedOutput.add(new Watermark(42L));
+		expectedOutput.add(new StreamRecord<>("6WM:42 TS:13", 13L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testTimestampAndProcessingTimeQuerying() throws Exception {
+
+		KeyedCoProcessOperator<String, Integer, String, String> operator =
+				new KeyedCoProcessOperator<>(new ProcessingTimeQueryingProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(17);
+		testHarness.processElement1(new StreamRecord<>(5));
+
+		testHarness.setProcessingTime(42);
+		testHarness.processElement2(new StreamRecord<>("6"));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("5PT:17 TS:null"));
+		expectedOutput.add(new StreamRecord<>("6PT:42 TS:null"));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testEventTimeTimers() throws Exception {
+
+		KeyedCoProcessOperator<String, Integer, String, String> operator =
+				new KeyedCoProcessOperator<>(new EventTimeTriggeringProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(17, 42L));
+		testHarness.processElement2(new StreamRecord<>("18", 42L));
+
+		testHarness.processWatermark1(new Watermark(5));
+		testHarness.processWatermark2(new Watermark(5));
+
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17", 42L));
+		expectedOutput.add(new StreamRecord<>("INPUT2:18", 42L));
+		expectedOutput.add(new StreamRecord<>("1777", 5L));
+		expectedOutput.add(new Watermark(5L));
+		expectedOutput.add(new StreamRecord<>("1777", 6L));
+		expectedOutput.add(new Watermark(6L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testProcessingTimeTimers() throws Exception {
+
+		KeyedCoProcessOperator<String, Integer, String, String> operator =
+				new KeyedCoProcessOperator<>(new ProcessingTimeTriggeringProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(17));
+		testHarness.processElement2(new StreamRecord<>("18"));
+
+		testHarness.setProcessingTime(5);
+		testHarness.setProcessingTime(6);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT2:18"));
+		expectedOutput.add(new StreamRecord<>("1777", 5L));
+		expectedOutput.add(new StreamRecord<>("1777", 6L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testEventTimeTimerWithState() throws Exception {
+
+		KeyedCoProcessOperator<String, Integer, String, String> operator =
+				new KeyedCoProcessOperator<>(new EventTimeTriggeringStatefulProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processWatermark1(new Watermark(1));
+		testHarness.processWatermark2(new Watermark(1));
+		testHarness.processElement1(new StreamRecord<>(17, 0L)); // should set timer for 6
+
+		testHarness.processWatermark1(new Watermark(2));
+		testHarness.processWatermark2(new Watermark(2));
+		testHarness.processElement2(new StreamRecord<>("42", 1L)); // should set timer for 7
+
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		testHarness.processWatermark1(new Watermark(7));
+		testHarness.processWatermark2(new Watermark(7));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new Watermark(1L));
+		expectedOutput.add(new StreamRecord<>("INPUT1:17", 0L));
+		expectedOutput.add(new Watermark(2L));
+		expectedOutput.add(new StreamRecord<>("INPUT2:42", 1L));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new Watermark(6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+		expectedOutput.add(new Watermark(7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	/**
+	 * Verifies that we don't have leakage between different keys.
+	 */
+	@Test
+	public void testProcessingTimeTimerWithState() throws Exception {
+
+		KeyedCoProcessOperator<String, Integer, String, String> operator =
+				new KeyedCoProcessOperator<>(new ProcessingTimeTriggeringStatefulProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.setProcessingTime(1);
+		testHarness.processElement1(new StreamRecord<>(17)); // should set timer for 6
+
+		testHarness.setProcessingTime(2);
+		testHarness.processElement2(new StreamRecord<>("42")); // should set timer for 7
+
+		testHarness.setProcessingTime(6);
+		testHarness.setProcessingTime(7);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("INPUT1:17"));
+		expectedOutput.add(new StreamRecord<>("INPUT2:42"));
+		expectedOutput.add(new StreamRecord<>("STATE:17", 6L));
+		expectedOutput.add(new StreamRecord<>("STATE:42", 7L));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+	@Test
+	public void testSnapshotAndRestore() throws Exception {
+
+		KeyedCoProcessOperator<String, Integer, String, String> operator =
+				new KeyedCoProcessOperator<>(new BothTriggeringProcessFunction());
+
+		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
+				new KeyedTwoInputStreamOperatorTestHarness<>(
+						operator,
+						new IntToStringKeySelector<>(),
+						new IdentityKeySelector<String>(),
+						BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.open();
+
+		testHarness.processElement1(new StreamRecord<>(5, 12L));
+		testHarness.processElement2(new StreamRecord<>("5", 12L));
+
+		// snapshot and restore from scratch
+		OperatorStateHandles snapshot = testHarness.snapshot(0, 0);
+
+		testHarness.close();
+
+		operator = new KeyedCoProcessOperator<>(new BothTriggeringProcessFunction());
+
+		testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
+				operator,
+				new IntToStringKeySelector<>(),
+				new IdentityKeySelector<String>(),
+				BasicTypeInfo.STRING_TYPE_INFO);
+
+		testHarness.setup();
+		testHarness.initializeState(snapshot);
+		testHarness.open();
+
+		testHarness.setProcessingTime(5);
+		testHarness.processWatermark1(new Watermark(6));
+		testHarness.processWatermark2(new Watermark(6));
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		expectedOutput.add(new StreamRecord<>("PROC:1777", 5L));
+		expectedOutput.add(new StreamRecord<>("EVENT:1777", 6L));
+		expectedOutput.add(new Watermark(6));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+		testHarness.close();
+	}
+
+
+	private static class IntToStringKeySelector<T> implements KeySelector<Integer, String> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Integer value) throws Exception {
+			return "" + value;
+		}
+	}
+
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
+	private static class WatermarkQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect(value + "WM:" + ctx.timerService().currentWatermark() + " TS:" + ctx.timestamp());
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class EventTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			ctx.timerService().registerEventTimeTimer(5);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			ctx.timerService().registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+
+			assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+			out.collect("" + 1777);
+		}
+	}
+
+	private static class EventTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<String> state =
+				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			getRuntimeContext().getState(state).update("" + value);
+			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			getRuntimeContext().getState(state).update(value);
+			ctx.timerService().registerEventTimeTimer(ctx.timerService().currentWatermark() + 5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			assertEquals(TimeDomain.EVENT_TIME, ctx.timeDomain());
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class ProcessingTimeTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			ctx.timerService().registerProcessingTimeTimer(5);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			ctx.timerService().registerProcessingTimeTimer(6);
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+
+			assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
+			out.collect("" + 1777);
+		}
+	}
+
+	private static class ProcessingTimeQueryingProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect(value + "PT:" + ctx.timerService().currentProcessingTime() + " TS:" + ctx.timestamp());
+		}
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+		}
+	}
+
+	private static class ProcessingTimeTriggeringStatefulProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		private final ValueStateDescriptor<String> state =
+				new ValueStateDescriptor<>("seen-element", StringSerializer.INSTANCE);
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT1:" + value);
+			getRuntimeContext().getState(state).update("" + value);
+			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			out.collect("INPUT2:" + value);
+			getRuntimeContext().getState(state).update(value);
+			ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			assertEquals(TimeDomain.PROCESSING_TIME, ctx.timeDomain());
+			out.collect("STATE:" + getRuntimeContext().getState(state).value());
+		}
+	}
+
+	private static class BothTriggeringProcessFunction extends CoProcessFunction<Integer, String, String> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void processElement1(Integer value, Context ctx, Collector<String> out) throws Exception {
+			ctx.timerService().registerEventTimeTimer(6);
+		}
+
+		@Override
+		public void processElement2(String value, Context ctx, Collector<String> out) throws Exception {
+			ctx.timerService().registerProcessingTimeTimer(5);
+		}
+
+
+		@Override
+		public void onTimer(
+				long timestamp,
+				OnTimerContext ctx,
+				Collector<String> out) throws Exception {
+			if (TimeDomain.EVENT_TIME.equals(ctx.timeDomain())) {
+				out.collect("EVENT:1777");
+			} else {
+				out.collect("PROC:1777");
+			}
+		}
+	}
+}