You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/28 17:45:11 UTC

[2/2] flink git commit: [FLINK-5026] Rename TimelyFlatMap to Process

[FLINK-5026] Rename TimelyFlatMap to Process


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/910f733f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/910f733f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/910f733f

Branch: refs/heads/master
Commit: 910f733f5ec52d2dd1e9dcc4ec6a4844cae2f2b4
Parents: 3a27f55
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 11 10:57:25 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 28 18:38:29 2016 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedStreams.java        |  62 ++-
 .../streaming/api/datastream/KeyedStream.java   |  55 +-
 .../api/functions/ProcessFunction.java          | 109 ++++
 .../api/functions/RichProcessFunction.java      |  40 ++
 .../functions/RichTimelyFlatMapFunction.java    |  40 --
 .../api/functions/TimelyFlatMapFunction.java    | 110 ----
 .../api/functions/co/CoProcessFunction.java     | 130 +++++
 .../api/functions/co/RichCoProcessFunction.java |  41 ++
 .../co/RichTimelyCoFlatMapFunction.java         |  41 --
 .../functions/co/TimelyCoFlatMapFunction.java   | 128 -----
 .../api/operators/ProcessOperator.java          | 151 ++++++
 .../api/operators/StreamTimelyFlatMap.java      | 151 ------
 .../api/operators/co/CoProcessOperator.java     | 167 ++++++
 .../api/operators/co/CoStreamTimelyFlatMap.java | 167 ------
 .../flink/streaming/api/DataStreamTest.java     |  23 +-
 .../api/operators/ProcessOperatorTest.java      | 404 ++++++++++++++
 .../api/operators/TimelyFlatMapTest.java        | 404 --------------
 .../api/operators/co/CoProcessOperatorTest.java | 536 +++++++++++++++++++
 .../api/operators/co/TimelyCoFlatMapTest.java   | 536 -------------------
 .../streaming/api/scala/ConnectedStreams.scala  |  39 +-
 .../flink/streaming/api/scala/KeyedStream.scala |  24 +-
 .../streaming/api/scala/DataStreamTest.scala    |  20 +-
 22 files changed, 1701 insertions(+), 1677 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index dc763cb..96a08d3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -27,11 +27,12 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoProcessFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.co.CoProcessOperator;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 
 import static java.util.Objects.requireNonNull;
@@ -234,64 +235,71 @@ public class ConnectedStreams<IN1, IN2> {
 	}
 
 	/**
-	 * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+	 * Applies the given {@link CoProcessFunction} on the connected input streams,
 	 * thereby creating a transformed output stream.
 	 *
-	 * <p>The function will be called for every element in the streams and can produce
-	 * zero or more output. The function can also query the time and set timers. When
-	 * reacting to the firing of set timers the function can emit yet more elements.
+	 * <p>The function will be called for every element in the input streams and can produce zero or
+	 * more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function, this
+	 * function can also query the time and set timers. When reacting to the firing of set timers
+	 * the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+	 * <p>A {@link RichCoProcessFunction}
 	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+	 * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
-	 * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+	 * @param <R> The type of elements emitted by the {@code CoProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
-	public <R> SingleOutputStreamOperator<R> flatMap(
-			TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(
+			CoProcessFunction<IN1, IN2, R> coProcessFunction) {
 
-		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
-				TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(),
+		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coProcessFunction,
+				CoProcessFunction.class, false, true, getType1(), getType2(),
 				Utils.getCallLocationName(), true);
 
-		return flatMap(coFlatMapper, outTypeInfo);
+		return process(coProcessFunction, outTypeInfo);
 	}
 
 	/**
-	 * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+	 * Applies the given {@link CoProcessFunction} on the connected input streams,
 	 * thereby creating a transformed output stream.
 	 *
-	 * <p>The function will be called for every element in the streams and can produce
-	 * zero or more output. The function can also query the time and set timers. When
-	 * reacting to the firing of set timers the function can emit yet more elements.
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements. Contrary to the {@link #flatMap(CoFlatMapFunction)} function,
+	 * this function can also query the time and set timers. When reacting to the firing of set
+	 * timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+	 * <p>A {@link RichCoProcessFunction}
 	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+	 * @param coProcessFunction The {@link CoProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
-	 * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+	 * @param <R> The type of elements emitted by the {@code CoProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
 	@Internal
-	public <R> SingleOutputStreamOperator<R> flatMap(
-			TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper,
+	public <R> SingleOutputStreamOperator<R> process(
+			CoProcessFunction<IN1, IN2, R> coProcessFunction,
 			TypeInformation<R> outputType) {
 
-		CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new CoStreamTimelyFlatMap<>(
-				inputStream1.clean(coFlatMapper));
+		if (!(inputStream1 instanceof KeyedStream) || !(inputStream2 instanceof KeyedStream)) {
+			throw new UnsupportedOperationException("A CoProcessFunction can only be applied" +
+					"when both input streams are keyed.");
+		}
 
-		return transform("Co-Flat Map", outputType, operator);
-	}
+		CoProcessOperator<Object, IN1, IN2, R> operator = new CoProcessOperator<>(
+				inputStream1.clean(coProcessFunction));
 
+		return transform("Co-Process", outputType, operator);
+	}
 
 	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> transform(String functionName,

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 5b00bcd..560ecab 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
@@ -32,7 +33,8 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.streaming.api.TimeCharacteristic;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.functions.RichProcessFunction;
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
 import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
@@ -42,7 +44,7 @@ import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamGroupedFold;
 import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
-import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.transformations.OneInputTransformation;
 import org.apache.flink.streaming.api.transformations.PartitionTransformation;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
@@ -173,67 +175,70 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+	 * Applies the given {@link ProcessFunction} on the input stream, thereby
 	 * creating a transformed output stream.
 	 *
-	 * <p>The function will be called for every element in the stream and can produce
-	 * zero or more output. The function can also query the time and set timers. When
-	 * reacting to the firing of set timers the function can emit yet more elements.
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+	 * function, this function can also query the time and set timers. When reacting to the firing
+	 * of set timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+	 * <p>A {@link RichProcessFunction}
 	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+	 * @param processFunction The {@link ProcessFunction} that is called for each element
 	 *                      in the stream.
 	 *
-	 * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
-	public <R> SingleOutputStreamOperator<R> flatMap(TimelyFlatMapFunction<T, R> flatMapper) {
+	@PublicEvolving
+	public <R> SingleOutputStreamOperator<R> process(ProcessFunction<T, R> processFunction) {
 
 		TypeInformation<R> outType = TypeExtractor.getUnaryOperatorReturnType(
-				flatMapper,
-				TimelyFlatMapFunction.class,
+				processFunction,
+				ProcessFunction.class,
 				false,
 				true,
 				getType(),
 				Utils.getCallLocationName(),
 				true);
 
-		return flatMap(flatMapper, outType);
+		return process(processFunction, outType);
 	}
 
 	/**
-	 * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+	 * Applies the given {@link ProcessFunction} on the input stream, thereby
 	 * creating a transformed output stream.
 	 *
-	 * <p>The function will be called for every element in the stream and can produce
-	 * zero or more output. The function can also query the time and set timers. When
-	 * reacting to the firing of set timers the function can emit yet more elements.
+	 * <p>The function will be called for every element in the input streams and can produce zero
+	 * or more output elements. Contrary to the {@link DataStream#flatMap(FlatMapFunction)}
+	 * function, this function can also query the time and set timers. When reacting to the firing
+	 * of set timers the function can directly emit elements and/or register yet more timers.
 	 *
-	 * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+	 * <p>A {@link RichProcessFunction}
 	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+	 * @param processFunction The {@link ProcessFunction} that is called for each element
 	 *                      in the stream.
 	 * @param outputType {@link TypeInformation} for the result type of the function.
 	 *
-	 * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+	 * @param <R> The type of elements emitted by the {@code ProcessFunction}.
 	 *
 	 * @return The transformed {@link DataStream}.
 	 */
 	@Internal
-	public <R> SingleOutputStreamOperator<R> flatMap(
-			TimelyFlatMapFunction<T, R> flatMapper,
+	public <R> SingleOutputStreamOperator<R> process(
+			ProcessFunction<T, R> processFunction,
 			TypeInformation<R> outputType) {
 
-		StreamTimelyFlatMap<KEY, T, R> operator =
-				new StreamTimelyFlatMap<>(clean(flatMapper));
+		ProcessOperator<KEY, T, R> operator =
+				new ProcessOperator<>(clean(processFunction));
 
-		return transform("Flat Map", outputType, operator);
+		return transform("Process", outputType, operator);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
new file mode 100644
index 0000000..fd0a829
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+/**
+ * A function that processes elements of a stream.
+ *
+ * <p>The function will be called for every element in the input stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>The function will be called for every element in the input stream and can produce
+ * zero or more output elements. Contrary to the
+ * {@link org.apache.flink.api.common.functions.FlatMapFunction}, this function can also query
+ * the time (both event and processing) and set timers, through the provided {@link Context}.
+ * When reacting to the firing of set timers the function can directly emit a result, and/or
+ * register a timer that will trigger an action in the future.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the output elements.
+ */
+@PublicEvolving
+public interface ProcessFunction<I, O> extends Function {
+
+	/**
+	 * Process one element from the input stream.
+	 *
+	 * <p>This function can output zero or more elements using the {@link Collector} parameter
+	 * and also update internal state or set timers using the {@link Context} parameter.
+	 *
+	 * @param value The input value.
+	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
+	 *            a {@link TimerService} for registering timers and querying the time. The
+	 *            context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void processElement(I value, Context ctx, Collector<O> out) throws Exception;
+
+	/**
+	 * Called when a timer set using {@link TimerService} fires.
+	 *
+	 * @param timestamp The timestamp of the firing timer.
+	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
+	 *            querying the {@link TimeDomain} of the firing timer and getting a
+	 *            {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
+
+	/**
+	 * Information available in an invocation of {@link #processElement(Object, Context, Collector)}
+	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	interface Context {
+
+		/**
+		 * Timestamp of the element currently being processed or timestamp of a firing timer.
+		 *
+		 * <p>This might be {@code null}, for example if the time characteristic of your program
+		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+		 */
+		Long timestamp();
+
+		/**
+		 * A {@link TimerService} for querying time and registering timers.
+		 */
+		TimerService timerService();
+	}
+
+	/**
+	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	interface OnTimerContext extends Context {
+		/**
+		 * The {@link TimeDomain} of the firing timer.
+		 */
+		TimeDomain timeDomain();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
new file mode 100644
index 0000000..834f717
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichProcessFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+
+/**
+ * Rich variant of the {@link ProcessFunction}. As a
+ * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
+ * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
+ * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
+ *
+ * @param <I> Type of the input elements.
+ * @param <O> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichProcessFunction<I, O>
+		extends AbstractRichFunction
+		implements ProcessFunction<I, O> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
deleted file mode 100644
index 0d86da9..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/RichTimelyFlatMapFunction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-
-/**
- * Rich variant of the {@link TimelyFlatMapFunction}. As a
- * {@link org.apache.flink.api.common.functions.RichFunction}, it gives access to the
- * {@link org.apache.flink.api.common.functions.RuntimeContext} and provides setup and teardown methods:
- * {@link org.apache.flink.api.common.functions.RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link org.apache.flink.api.common.functions.RichFunction#close()}.
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichTimelyFlatMapFunction<I, O>
-		extends AbstractRichFunction
-		implements TimelyFlatMapFunction<I, O> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
deleted file mode 100644
index 5f039c4..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * Base interface for timely flatMap functions. FlatMap functions take elements and transform them,
- * into zero, one, or more elements. Typical applications can be splitting elements, or unnesting lists
- * and arrays.
- *
- * <p>A {@code TimelyFlatMapFunction} can, in addition to the functionality of a normal
- * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set timers and react
- * to them firing.
- *
- * <pre>{@code
- * DataStream<X> input = ...;
- *
- * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
- * }</pre>
- *
- * @param <I> Type of the input elements.
- * @param <O> Type of the returned elements.
- */
-@PublicEvolving
-public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {
-
-	/**
-	 * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms
-	 * it into zero, one, or more elements.
-	 *
-	 * @param value The input value.
-	 * @param ctx A {@link Context} that allows querying the timestamp of the element and getting
-	 *            a {@link TimerService} for registering timers and querying the time. The
-	 *            context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector for returning result values.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
-	void flatMap(I value, Context ctx, Collector<O> out) throws Exception;
-
-	/**
-	 * Called when a timer set using {@link TimerService} fires.
-	 *
-	 * @param timestamp The timestamp of the firing timer.
-	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
-	 *            querying the {@link TimeDomain} of the firing timer and getting a
-	 *            {@link TimerService} for registering timers and querying the time.
-	 *            The context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector for returning result values.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
-	void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception ;
-
-	/**
-	 * Information available in an invocation of {@link #flatMap(Object, Context, Collector)}
-	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
-	 */
-	interface Context {
-
-		/**
-		 * Timestamp of the element currently being processed or timestamp of a firing timer.
-		 *
-		 * <p>This might be {@code null}, for example if the time characteristic of your program
-		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
-		 */
-		Long timestamp();
-
-		/**
-		 * A {@link TimerService} for querying time and registering timers.
-		 */
-		TimerService timerService();
-	}
-
-	/**
-	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
-	 */
-	interface OnTimerContext extends Context {
-		/**
-		 * The {@link TimeDomain} of the firing timer.
-		 */
-		TimeDomain timeDomain();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
new file mode 100644
index 0000000..feff8fb
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * A function that processes elements of two streams and produces a single output one.
+ *
+ * <p>The function will be called for every element in the input streams and can produce
+ * zero or more output elements. Contrary to the {@link CoFlatMapFunction}, this function can also
+ * query the time (both event and processing) and set timers, through the provided {@link Context}.
+ * When reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>An example use-case for connected streams would be the application of a set of rules that change
+ * over time ({@code stream A}) to the elements contained in another stream (stream {@code B}). The rules
+ * contained in {@code stream A} can be stored in the state and wait for new elements to arrive on
+ * {@code stream B}. Upon reception of a new element on {@code stream B}, the function can now apply the
+ * previously stored rules to the element and directly emit a result, and/or register a timer that
+ * will trigger an action in the future.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Output type.
+ */
+@PublicEvolving
+public interface CoProcessFunction<IN1, IN2, OUT> extends Function, Serializable {
+
+	/**
+	 * This method is called for each element in the first of the connected streams.
+	 *
+	 * This function can output zero or more elements using the {@link Collector} parameter
+	 * and also update internal state or set timers using the {@link Context} parameter.
+	 * 
+	 * @param value The stream element
+	 * @param ctx A {@link Context} that allows querying the timestamp of the element,
+	 *            querying the {@link TimeDomain} of the firing timer and getting a
+	 *            {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector to emit resulting elements to
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
+	void processElement1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
+
+	/**
+	 * This method is called for each element in the second of the connected streams.
+	 *
+	 * This function can output zero or more elements using the {@link Collector} parameter
+	 * and also update internal state or set timers using the {@link Context} parameter.
+	 * 
+	 * @param value The stream element
+	 * @param ctx A {@link Context} that allows querying the timestamp of the element,
+	 *            querying the {@link TimeDomain} of the firing timer and getting a
+	 *            {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector to emit resulting elements to
+	 * @throws Exception The function may throw exceptions which cause the streaming program
+	 *                   to fail and go into recovery.
+	 */
+	void processElement2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
+
+	/**
+	 * Called when a timer set using {@link TimerService} fires.
+	 *
+	 * @param timestamp The timestamp of the firing timer.
+	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
+	 *            querying the {@link TimeDomain} of the firing timer and getting a
+	 *            {@link TimerService} for registering timers and querying the time.
+	 *            The context is only valid during the invocation of this method, do not store it.
+	 * @param out The collector for returning result values.
+	 *
+	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
+	 *                   to fail and may trigger recovery.
+	 */
+	void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
+
+	/**
+	 * Information available in an invocation of {@link #processElement1(Object, Context, Collector)}/
+	 * {@link #processElement2(Object, Context, Collector)}
+	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	interface Context {
+
+		/**
+		 * Timestamp of the element currently being processed or timestamp of a firing timer.
+		 *
+		 * <p>This might be {@code null}, for example if the time characteristic of your program
+		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+		 */
+		Long timestamp();
+
+		/**
+		 * A {@link TimerService} for querying time and registering timers.
+		 */
+		TimerService timerService();
+	}
+
+	/**
+	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
+	 */
+	interface OnTimerContext extends Context {
+		/**
+		 * The {@link TimeDomain} of the firing timer.
+		 */
+		TimeDomain timeDomain();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
new file mode 100644
index 0000000..0fcea91
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichCoProcessFunction.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions.co;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+
+/**
+ * Rich variant of the {@link CoProcessFunction}. As a {@link RichFunction}, it gives
+ * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
+ * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
+ * and {@link RichFunction#close()}.
+ *
+ * @param <IN1> Type of the first input.
+ * @param <IN2> Type of the second input.
+ * @param <OUT> Type of the returned elements.
+ */
+@PublicEvolving
+public abstract class RichCoProcessFunction<IN1, IN2, OUT>
+		extends AbstractRichFunction
+		implements CoProcessFunction<IN1, IN2, OUT> {
+
+	private static final long serialVersionUID = 1L;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
deleted file mode 100644
index 12fe181..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/RichTimelyCoFlatMapFunction.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.AbstractRichFunction;
-import org.apache.flink.api.common.functions.RichFunction;
-
-/**
- * Rich variant of the {@link TimelyCoFlatMapFunction}. As a {@link RichFunction}, it gives
- * access to the {@link org.apache.flink.api.common.functions.RuntimeContext} and provides
- * setup and teardown methods: {@link RichFunction#open(org.apache.flink.configuration.Configuration)}
- * and {@link RichFunction#close()}.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Type of the returned elements.
- */
-@PublicEvolving
-public abstract class RichTimelyCoFlatMapFunction<IN1, IN2, OUT>
-		extends AbstractRichFunction
-		implements TimelyCoFlatMapFunction<IN1, IN2, OUT> {
-
-	private static final long serialVersionUID = 1L;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
deleted file mode 100644
index 89c7d79..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/co/TimelyCoFlatMapFunction.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.functions.co;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.util.Collector;
-
-import java.io.Serializable;
-
-/**
- * A {@code TimelyCoFlatMapFunction} implements a flat-map transformation over two
- * connected streams.
- * 
- * <p>The same instance of the transformation function is used to transform
- * both of the connected streams. That way, the stream transformations can
- * share state.
- *
- * <p>A {@code TimelyCoFlatMapFunction} can, in addition to the functionality of a normal
- * {@link CoFlatMapFunction}, also set timers and react to them firing.
- * 
- * <p>An example for the use of connected streams would be to apply rules that change over time
- * onto elements of a stream. One of the connected streams has the rules, the other stream the
- * elements to apply the rules to. The operation on the connected stream maintains the 
- * current set of rules in the state. It may receive either a rule update (from the first stream)
- * and update the state, or a data element (from the second stream) and apply the rules in the
- * state to the element. The result of applying the rules would be emitted.
- *
- * @param <IN1> Type of the first input.
- * @param <IN2> Type of the second input.
- * @param <OUT> Output type.
- */
-@PublicEvolving
-public interface TimelyCoFlatMapFunction<IN1, IN2, OUT> extends Function, Serializable {
-
-	/**
-	 * This method is called for each element in the first of the connected streams.
-	 * 
-	 * @param value The stream element
-	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element,
-	 *            querying the {@link TimeDomain} of the firing timer and getting a
-	 *            {@link TimerService} for registering timers and querying the time.
-	 *            The context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector to emit resulting elements to
-	 * @throws Exception The function may throw exceptions which cause the streaming program
-	 *                   to fail and go into recovery.
-	 */
-	void flatMap1(IN1 value, Context ctx, Collector<OUT> out) throws Exception;
-
-	/**
-	 * This method is called for each element in the second of the connected streams.
-	 * 
-	 * @param value The stream element
-	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the element,
-	 *            querying the {@link TimeDomain} of the firing timer and getting a
-	 *            {@link TimerService} for registering timers and querying the time.
-	 *            The context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector to emit resulting elements to
-	 * @throws Exception The function may throw exceptions which cause the streaming program
-	 *                   to fail and go into recovery.
-	 */
-	void flatMap2(IN2 value, Context ctx, Collector<OUT> out) throws Exception;
-
-	/**
-	 * Called when a timer set using {@link TimerService} fires.
-	 *
-	 * @param timestamp The timestamp of the firing timer.
-	 * @param ctx An {@link OnTimerContext} that allows querying the timestamp of the firing timer,
-	 *            querying the {@link TimeDomain} of the firing timer and getting a
-	 *            {@link TimerService} for registering timers and querying the time.
-	 *            The context is only valid during the invocation of this method, do not store it.
-	 * @param out The collector for returning result values.
-	 *
-	 * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation
-	 *                   to fail and may trigger recovery.
-	 */
-	void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception ;
-
-	/**
-	 * Information available in an invocation of {@link #flatMap1(Object, Context, Collector)}/
-	 * {@link #flatMap2(Object, Context, Collector)}
-	 * or {@link #onTimer(long, OnTimerContext, Collector)}.
-	 */
-	interface Context {
-
-		/**
-		 * Timestamp of the element currently being processed or timestamp of a firing timer.
-		 *
-		 * <p>This might be {@code null}, for example if the time characteristic of your program
-		 * is set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
-		 */
-		Long timestamp();
-
-		/**
-		 * A {@link TimerService} for querying time and registering timers.
-		 */
-		TimerService timerService();
-	}
-
-	/**
-	 * Information available in an invocation of {@link #onTimer(long, OnTimerContext, Collector)}.
-	 */
-	interface OnTimerContext extends TimelyFlatMapFunction.Context {
-		/**
-		 * The {@link TimeDomain} of the firing timer.
-		 */
-		TimeDomain timeDomain();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
new file mode 100644
index 0000000..3b13360
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/ProcessOperator.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class ProcessOperator<K, IN, OUT>
+		extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
+		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient TimerService timerService;
+
+	private transient ContextImpl<IN> context;
+
+	private transient OnTimerContextImpl onTimerContext;
+
+	public ProcessOperator(ProcessFunction<IN, OUT> flatMapper) {
+		super(flatMapper);
+
+		chainingStrategy = ChainingStrategy.ALWAYS;
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+		this.timerService = new SimpleTimerService(internalTimerService);
+
+		context = new ContextImpl<>(timerService);
+		onTimerContext = new OnTimerContextImpl(timerService);
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	private static class ContextImpl<T> implements ProcessFunction.Context {
+
+		private final TimerService timerService;
+
+		private StreamRecord<T> element;
+
+		ContextImpl(TimerService timerService) {
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(element != null);
+
+			if (element.hasTimestamp()) {
+				return element.getTimestamp();
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+
+	private static class OnTimerContextImpl implements ProcessFunction.OnTimerContext{
+
+		private final TimerService timerService;
+
+		private TimeDomain timeDomain;
+
+		private InternalTimer<?, VoidNamespace> timer;
+
+		OnTimerContextImpl(TimerService timerService) {
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public TimeDomain timeDomain() {
+			checkState(timeDomain != null);
+			return timeDomain;
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(timer != null);
+			return timer.getTimestamp();
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
deleted file mode 100644
index bafc435..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-@Internal
-public class StreamTimelyFlatMap<K, IN, OUT>
-		extends AbstractUdfStreamOperator<OUT, TimelyFlatMapFunction<IN, OUT>>
-		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient TimestampedCollector<OUT> collector;
-
-	private transient TimerService timerService;
-
-	private transient ContextImpl<IN> context;
-
-	private transient OnTimerContextImpl onTimerContext;
-
-	public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
-		super(flatMapper);
-
-		chainingStrategy = ChainingStrategy.ALWAYS;
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-
-		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
-		this.timerService = new SimpleTimerService(internalTimerService);
-
-		context = new ContextImpl<>(timerService);
-		onTimerContext = new OnTimerContextImpl(timerService);
-	}
-
-	@Override
-	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	@Override
-	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	@Override
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		collector.setTimestamp(element);
-		context.element = element;
-		userFunction.flatMap(element.getValue(), context, collector);
-		context.element = null;
-	}
-
-	private static class ContextImpl<T> implements TimelyFlatMapFunction.Context {
-
-		private final TimerService timerService;
-
-		private StreamRecord<T> element;
-
-		ContextImpl(TimerService timerService) {
-			this.timerService = checkNotNull(timerService);
-		}
-
-		@Override
-		public Long timestamp() {
-			checkState(element != null);
-
-			if (element.hasTimestamp()) {
-				return element.getTimestamp();
-			} else {
-				return null;
-			}
-		}
-
-		@Override
-		public TimerService timerService() {
-			return timerService;
-		}
-	}
-
-	private static class OnTimerContextImpl implements TimelyFlatMapFunction.OnTimerContext{
-
-		private final TimerService timerService;
-
-		private TimeDomain timeDomain;
-
-		private InternalTimer<?, VoidNamespace> timer;
-
-		OnTimerContextImpl(TimerService timerService) {
-			this.timerService = checkNotNull(timerService);
-		}
-
-		@Override
-		public TimeDomain timeDomain() {
-			checkState(timeDomain != null);
-			return timeDomain;
-		}
-
-		@Override
-		public Long timestamp() {
-			checkState(timer != null);
-			return timer.getTimestamp();
-		}
-
-		@Override
-		public TimerService timerService() {
-			return timerService;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
new file mode 100644
index 0000000..e6c3d3f
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoProcessOperator.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.co;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.VoidNamespace;
+import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.streaming.api.SimpleTimerService;
+import org.apache.flink.streaming.api.TimeDomain;
+import org.apache.flink.streaming.api.TimerService;
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.InternalTimer;
+import org.apache.flink.streaming.api.operators.InternalTimerService;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.operators.Triggerable;
+import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+@Internal
+public class CoProcessOperator<K, IN1, IN2, OUT>
+		extends AbstractUdfStreamOperator<OUT, CoProcessFunction<IN1, IN2, OUT>>
+		implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
+
+	private static final long serialVersionUID = 1L;
+
+	private transient TimestampedCollector<OUT> collector;
+
+	private transient TimerService timerService;
+
+	private transient ContextImpl context;
+
+	private transient OnTimerContextImpl onTimerContext;
+
+	public CoProcessOperator(CoProcessFunction<IN1, IN2, OUT> flatMapper) {
+		super(flatMapper);
+	}
+
+	@Override
+	public void open() throws Exception {
+		super.open();
+		collector = new TimestampedCollector<>(output);
+
+		InternalTimerService<VoidNamespace> internalTimerService =
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
+
+		this.timerService = new SimpleTimerService(internalTimerService);
+
+		context = new ContextImpl(timerService);
+		onTimerContext = new OnTimerContextImpl(timerService);
+	}
+
+	@Override
+	public void processElement1(StreamRecord<IN1> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement1(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	@Override
+	public void processElement2(StreamRecord<IN2> element) throws Exception {
+		collector.setTimestamp(element);
+		context.element = element;
+		userFunction.processElement2(element.getValue(), context, collector);
+		context.element = null;
+	}
+
+	@Override
+	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	@Override
+	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
+		collector.setAbsoluteTimestamp(timer.getTimestamp());
+		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
+		onTimerContext.timer = timer;
+		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
+		onTimerContext.timeDomain = null;
+		onTimerContext.timer = null;
+	}
+
+	protected TimestampedCollector<OUT> getCollector() {
+		return collector;
+	}
+
+	private static class ContextImpl implements CoProcessFunction.Context {
+
+		private final TimerService timerService;
+
+		private StreamRecord<?> element;
+
+		ContextImpl(TimerService timerService) {
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(element != null);
+
+			if (element.hasTimestamp()) {
+				return element.getTimestamp();
+			} else {
+				return null;
+			}
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+
+	private static class OnTimerContextImpl implements CoProcessFunction.OnTimerContext {
+
+		private final TimerService timerService;
+
+		private TimeDomain timeDomain;
+
+		private InternalTimer<?, VoidNamespace> timer;
+
+		OnTimerContextImpl(TimerService timerService) {
+			this.timerService = checkNotNull(timerService);
+		}
+
+		@Override
+		public TimeDomain timeDomain() {
+			checkState(timeDomain != null);
+			return timeDomain;
+		}
+
+		@Override
+		public Long timestamp() {
+			checkState(timer != null);
+			return timer.getTimestamp();
+		}
+
+		@Override
+		public TimerService timerService() {
+			return timerService;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
deleted file mode 100644
index 75e4c14..0000000
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators.co;
-
-import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.state.VoidNamespace;
-import org.apache.flink.runtime.state.VoidNamespaceSerializer;
-import org.apache.flink.streaming.api.SimpleTimerService;
-import org.apache.flink.streaming.api.TimeDomain;
-import org.apache.flink.streaming.api.TimerService;
-import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
-import org.apache.flink.streaming.api.operators.InternalTimer;
-import org.apache.flink.streaming.api.operators.InternalTimerService;
-import org.apache.flink.streaming.api.operators.TimestampedCollector;
-import org.apache.flink.streaming.api.operators.Triggerable;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
-
-@Internal
-public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
-		extends AbstractUdfStreamOperator<OUT, TimelyCoFlatMapFunction<IN1, IN2, OUT>>
-		implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<K, VoidNamespace> {
-
-	private static final long serialVersionUID = 1L;
-
-	private transient TimestampedCollector<OUT> collector;
-
-	private transient TimerService timerService;
-
-	private transient ContextImpl context;
-
-	private transient OnTimerContextImpl onTimerContext;
-
-	public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
-		super(flatMapper);
-	}
-
-	@Override
-	public void open() throws Exception {
-		super.open();
-		collector = new TimestampedCollector<>(output);
-
-		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
-
-		this.timerService = new SimpleTimerService(internalTimerService);
-
-		context = new ContextImpl(timerService);
-		onTimerContext = new OnTimerContextImpl(timerService);
-	}
-
-	@Override
-	public void processElement1(StreamRecord<IN1> element) throws Exception {
-		collector.setTimestamp(element);
-		context.element = element;
-		userFunction.flatMap1(element.getValue(), context, collector);
-		context.element = null;
-	}
-
-	@Override
-	public void processElement2(StreamRecord<IN2> element) throws Exception {
-		collector.setTimestamp(element);
-		context.element = element;
-		userFunction.flatMap2(element.getValue(), context, collector);
-		context.element = null;
-	}
-
-	@Override
-	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.EVENT_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	@Override
-	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
-		collector.setAbsoluteTimestamp(timer.getTimestamp());
-		onTimerContext.timeDomain = TimeDomain.PROCESSING_TIME;
-		onTimerContext.timer = timer;
-		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
-		onTimerContext.timeDomain = null;
-		onTimerContext.timer = null;
-	}
-
-	protected TimestampedCollector<OUT> getCollector() {
-		return collector;
-	}
-
-	private static class ContextImpl implements TimelyCoFlatMapFunction.Context {
-
-		private final TimerService timerService;
-
-		private StreamRecord<?> element;
-
-		ContextImpl(TimerService timerService) {
-			this.timerService = checkNotNull(timerService);
-		}
-
-		@Override
-		public Long timestamp() {
-			checkState(element != null);
-
-			if (element.hasTimestamp()) {
-				return element.getTimestamp();
-			} else {
-				return null;
-			}
-		}
-
-		@Override
-		public TimerService timerService() {
-			return timerService;
-		}
-	}
-
-	private static class OnTimerContextImpl implements TimelyCoFlatMapFunction.OnTimerContext {
-
-		private final TimerService timerService;
-
-		private TimeDomain timeDomain;
-
-		private InternalTimer<?, VoidNamespace> timer;
-
-		OnTimerContextImpl(TimerService timerService) {
-			this.timerService = checkNotNull(timerService);
-		}
-
-		@Override
-		public TimeDomain timeDomain() {
-			checkState(timeDomain != null);
-			return timeDomain;
-		}
-
-		@Override
-		public Long timestamp() {
-			checkState(timer != null);
-			return timer.getTimestamp();
-		}
-
-		@Override
-		public TimerService timerService() {
-			return timerService;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/910f733f/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 8f002ba..eaac6b8 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -37,7 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -47,7 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
+import org.apache.flink.streaming.api.operators.ProcessOperator;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -547,18 +547,19 @@ public class DataStreamTest {
 	}
 
 	/**
-	 * Verify that a timely flat map call is correctly translated to an operator.
+	 * Verify that a {@link KeyedStream#process(ProcessFunction)} call is correctly translated to
+	 * an operator.
 	 */
 	@Test
-	public void testTimelyFlatMapTranslation() {
+	public void testProcessTranslation() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		DataStreamSource<Long> src = env.generateSequence(0, 0);
 
-		TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() {
+		ProcessFunction<Long, Integer> processFunction = new ProcessFunction<Long, Integer>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
-			public void flatMap(
+			public void processElement(
 					Long value,
 					Context ctx,
 					Collector<Integer> out) throws Exception {
@@ -574,14 +575,14 @@ public class DataStreamTest {
 			}
 		};
 
-		DataStream<Integer> flatMapped = src
+		DataStream<Integer> processed = src
 				.keyBy(new IdentityKeySelector<Long>())
-				.flatMap(timelyFlatMapFunction);
+				.process(processFunction);
 
-		flatMapped.addSink(new DiscardingSink<Integer>());
+		processed.addSink(new DiscardingSink<Integer>());
 
-		assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped));
-		assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap);
+		assertEquals(processFunction, getFunctionForDataStream(processed));
+		assertTrue(getOperatorForDataStream(processed) instanceof ProcessOperator);
 	}
 
 	@Test