You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/07 15:26:29 UTC

[1/6] flink git commit: [FLINK-4957] Provide API for TimelyCoFlatMapFunction

Repository: flink
Updated Branches:
  refs/heads/master 718f6e4e3 -> 891950eab


[FLINK-4957] Provide API for TimelyCoFlatMapFunction


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891950ea
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891950ea
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891950ea

Branch: refs/heads/master
Commit: 891950eabaaed1fdfc1c0c88806f1125b085c4b6
Parents: 0b873ac
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 14:36:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../api/datastream/ConnectedStreams.java        | 63 ++++++++++++++++++++
 .../streaming/api/scala/ConnectedStreams.scala  | 30 +++++++++-
 2 files changed, 92 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index 50ef95b..dc763cb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -26,9 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.TimelyCoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.operators.co.CoStreamFlatMap;
 import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.operators.co.CoStreamTimelyFlatMap;
 import org.apache.flink.streaming.api.transformations.TwoInputTransformation;
 
 import static java.util.Objects.requireNonNull;
@@ -230,6 +233,66 @@ public class ConnectedStreams<IN1, IN2> {
 		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
 	}
 
+	/**
+	 * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+	 * thereby creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the streams and can produce
+	 * zero or more output. The function can also query the time and set timers. When
+	 * reacting to the firing of set timers the function can emit yet more elements.
+	 *
+	 * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+	 * can be used to gain access to features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+	 *                      in the stream.
+	 *
+	 * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R> flatMap(
+			TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper) {
+
+		TypeInformation<R> outTypeInfo = TypeExtractor.getBinaryOperatorReturnType(coFlatMapper,
+				TimelyCoFlatMapFunction.class, false, true, getType1(), getType2(),
+				Utils.getCallLocationName(), true);
+
+		return flatMap(coFlatMapper, outTypeInfo);
+	}
+
+	/**
+	 * Applies the given {@link TimelyCoFlatMapFunction} on the connected input streams,
+	 * thereby creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the streams and can produce
+	 * zero or more output. The function can also query the time and set timers. When
+	 * reacting to the firing of set timers the function can emit yet more elements.
+	 *
+	 * <p>A {@link org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction}
+	 * can be used to gain access to features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param coFlatMapper The {@link TimelyCoFlatMapFunction} that is called for each element
+	 *                      in the stream.
+	 *
+	 * @param <R> The of elements emitted by the {@code TimelyCoFlatMapFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> flatMap(
+			TimelyCoFlatMapFunction<IN1, IN2, R> coFlatMapper,
+			TypeInformation<R> outputType) {
+
+		CoStreamTimelyFlatMap<Object, IN1, IN2, R> operator = new CoStreamTimelyFlatMap<>(
+				inputStream1.clean(coFlatMapper));
+
+		return transform("Co-Flat Map", outputType, operator);
+	}
+
+
 	@PublicEvolving
 	public <R> SingleOutputStreamOperator<R> transform(String functionName,
 			TypeInformation<R> outTypeInfo,

http://git-wip-us.apache.org/repos/asf/flink/blob/891950ea/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 141625e..50526b5 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
-import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
+import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction, TimelyCoFlatMapFunction}
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator
 import org.apache.flink.util.Collector
 
@@ -101,6 +101,34 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
   /**
+   * Applies the given [[TimelyCoFlatMapFunction]] on the connected input streams,
+   * thereby creating a transformed output stream.
+   *
+   * The function will be called for every element in the streams and can produce
+   * zero or more output. The function can also query the time and set timers. When
+   * reacting to the firing of set timers the function can emit yet more elements.
+   *
+   * A [[org.apache.flink.streaming.api.functions.co.RichTimelyCoFlatMapFunction]]
+   * can be used to gain access to features provided by the
+   * [[org.apache.flink.api.common.functions.RichFunction]] interface.
+   *
+   * @param coFlatMapper The [[TimelyCoFlatMapFunction]] that is called for each element
+    *                     in the stream.
+    *
+   * @return The transformed { @link DataStream}.
+   */
+  def flatMap[R: TypeInformation](
+      coFlatMapper: TimelyCoFlatMapFunction[IN1, IN2, R]) : DataStream[R] = {
+
+    if (coFlatMapper == null) throw new NullPointerException("FlatMap function must not be null.")
+
+    val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
+
+    asScalaStream(javaStream.flatMap(coFlatMapper, outType))
+  }
+
+
+  /**
    * Applies a CoFlatMap transformation on these connected streams.
    *
    * The transformation calls [[CoFlatMapFunction#flatMap1]] for each element


[3/6] flink git commit: [hotfix] Add Check for Keyed Operator in getInternalTimerService()

Posted by al...@apache.org.
[hotfix] Add Check for Keyed Operator in getInternalTimerService()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/132d8f14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/132d8f14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/132d8f14

Branch: refs/heads/master
Commit: 132d8f146c111ca7acf890fc2e2950e4931b2c7b
Parents: 718f6e4
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Mon Nov 7 11:52:38 2016 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/operators/AbstractStreamOperator.java     | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/132d8f14/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index a659866..7b555b7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -784,6 +784,9 @@ public abstract class AbstractStreamOperator<OUT>
 			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,
 			Triggerable<K, N> triggerable) {
+		if (getKeyedStateBackend() == null) {
+			throw new UnsupportedOperationException("Timers can only be used on keyed operators.");
+		}
 
 		@SuppressWarnings("unchecked")
 		HeapInternalTimerService<K, N> timerService = (HeapInternalTimerService<K, N>) timerServices.get(name);


[6/6] flink git commit: [FLINK-4952] [scala] Add KeyedStream.flatMap(TimelyFlatMapFunction)

Posted by al...@apache.org.
[FLINK-4952] [scala] Add KeyedStream.flatMap(TimelyFlatMapFunction)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0ef3703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0ef3703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0ef3703

Branch: refs/heads/master
Commit: f0ef370399638689c2e1adc54a3acf0afab67a17
Parents: b9173b3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 13:47:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java   | 31 ++++++++++++++++--
 .../flink/streaming/api/scala/KeyedStream.scala | 33 +++++++++++++++++++-
 2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 922ad20..c938f5b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -202,11 +202,38 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 				Utils.getCallLocationName(),
 				true);
 
+		return flatMap(flatMapper, outType);
+	}
+
+	/**
+	 * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+	 * creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the stream and can produce
+	 * zero or more output. The function can also query the time and set timers. When
+	 * reacting to the firing of set timers the function can emit yet more elements.
+	 *
+	 * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+	 * can be used to gain access to features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+	 *                      in the stream.
+	 * @param outputType {@link TypeInformation} for the result type of the function.
+	 *
+	 * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> flatMap(
+			TimelyFlatMapFunction<T, R> flatMapper,
+			TypeInformation<R> outputType) {
+
 		StreamTimelyFlatMap<KEY, T, R> operator =
 				new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
 
-		return transform("Flat Map", outType, operator);
-
+		return transform("Flat Map", outputType, operator);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 68eebea..1971359 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, QueryableStateStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -46,6 +47,36 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    */
   @Internal
   def getKeyType = javaStream.getKeyType()
+
+
+  // ------------------------------------------------------------------------
+  //  basic transformations
+  // ------------------------------------------------------------------------
+
+  /**
+    * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby
+    * creating a transformed output stream.
+    *
+    * The function will be called for every element in the stream and can produce
+    * zero or more output. The function can also query the time and set timers. When
+    * reacting to the firing of set timers the function can emit yet more elements.
+    *
+    * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]]
+    * can be used to gain access to features provided by the
+    * [[org.apache.flink.api.common.functions.RichFunction]]
+    *
+    * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element
+    *                   in the stream.
+    */
+  def flatMap[R: TypeInformation](
+      flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = {
+
+    if (flatMapper == null) {
+      throw new NullPointerException("TimelyFlatMapFunction must not be null.")
+    }
+
+    asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]]))
+  }
   
   // ------------------------------------------------------------------------
   //  Windowing


[4/6] flink git commit: [FLINK-4951] Fix Javadoc of KeyedStream.flatMap(TimelyFlatMapFunction)

Posted by al...@apache.org.
[FLINK-4951] Fix Javadoc of KeyedStream.flatMap(TimelyFlatMapFunction)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b9173b35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b9173b35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b9173b35

Branch: refs/heads/master
Commit: b9173b355139f007fb840aa6a196f50b366ad8d8
Parents: 132d8f1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 12:21:22 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java   | 25 ++++++++++----------
 1 file changed, 13 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b9173b35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 1bce6a2..922ad20 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -20,10 +20,8 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
@@ -175,19 +173,22 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 	}
 
 	/**
-	 * Applies a FlatMap transformation on a {@link DataStream}. The
-	 * transformation calls a {@link FlatMapFunction} for each element of the
-	 * DataStream. Each FlatMapFunction call can return any number of elements
-	 * including none. The user can also extend {@link RichFlatMapFunction} to
-	 * gain access to other features provided by the
+	 * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+	 * creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the stream and can produce
+	 * zero or more output. The function can also query the time and set timers. When
+	 * reacting to the firing of set timers the function can emit yet more elements.
+	 *
+	 * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+	 * can be used to gain access to features provided by the
 	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 *
-	 * @param flatMapper
-	 *            The FlatMapFunction that is called for each element of the
-	 *            DataStream
+	 * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+	 *                      in the stream.
+	 *
+	 * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
 	 *
-	 * @param <R>
-	 *            output type
 	 * @return The transformed {@link DataStream}.
 	 */
 	public <R> SingleOutputStreamOperator<R> flatMap(TimelyFlatMapFunction<T, R> flatMapper) {


[5/6] flink git commit: [FLINK-4957] Remove Key Serializer Parameter getInternalTimerService()

Posted by al...@apache.org.
[FLINK-4957] Remove Key Serializer Parameter getInternalTimerService()

It's not needed because we can get the key serializer from the keyed
state backend.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b873ac3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b873ac3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b873ac3

Branch: refs/heads/master
Commit: 0b873ac343343fd1d7716d075b54e66324374f47
Parents: 06fb9f1
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 14:34:47 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java     |  2 +-
 .../api/operators/AbstractStreamOperator.java     | 13 ++++++-------
 .../api/operators/StreamTimelyFlatMap.java        |  9 ++-------
 .../api/operators/co/CoStreamTimelyFlatMap.java   | 11 ++---------
 .../operators/windowing/WindowOperator.java       |  2 +-
 .../api/operators/AbstractStreamOperatorTest.java |  1 -
 .../api/operators/TimelyFlatMapTest.java          | 18 +++++++++---------
 .../api/operators/co/TimelyCoFlatMapTest.java     | 16 ++++++++--------
 8 files changed, 29 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index c938f5b..4063b60 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -231,7 +231,7 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 			TypeInformation<R> outputType) {
 
 		StreamTimelyFlatMap<KEY, T, R> operator =
-				new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
+				new StreamTimelyFlatMap<>(clean(flatMapper));
 
 		return transform("Flat Map", outputType, operator);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 7b555b7..839abf8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -772,24 +772,21 @@ public abstract class AbstractStreamOperator<OUT>
 	 *
 	 * @param name The name of the requested timer service. If no service exists under the given
 	 *             name a new one will be created and returned.
-	 * @param keySerializer {@code TypeSerializer} for the keys of the timers.
 	 * @param namespaceSerializer {@code TypeSerializer} for the timer namespace.
 	 * @param triggerable The {@link Triggerable} that should be invoked when timers fire
 	 *
-	 * @param <K> The type of the timer keys.
 	 * @param <N> The type of the timer namespace.
 	 */
-	public <K, N> InternalTimerService<N> getInternalTimerService(
+	public <N> InternalTimerService<N> getInternalTimerService(
 			String name,
-			TypeSerializer<K> keySerializer,
 			TypeSerializer<N> namespaceSerializer,
-			Triggerable<K, N> triggerable) {
+			Triggerable<?, N> triggerable) {
 		if (getKeyedStateBackend() == null) {
 			throw new UnsupportedOperationException("Timers can only be used on keyed operators.");
 		}
 
 		@SuppressWarnings("unchecked")
-		HeapInternalTimerService<K, N> timerService = (HeapInternalTimerService<K, N>) timerServices.get(name);
+		HeapInternalTimerService<Object, N> timerService = (HeapInternalTimerService<Object, N>) timerServices.get(name);
 
 		if (timerService == null) {
 			timerService = new HeapInternalTimerService<>(
@@ -799,7 +796,9 @@ public abstract class AbstractStreamOperator<OUT>
 				getRuntimeContext().getProcessingTimeService());
 			timerServices.put(name, timerService);
 		}
-		timerService.startTimerService(keySerializer, namespaceSerializer, triggerable);
+		@SuppressWarnings({"unchecked", "rawtypes"})
+		Triggerable rawTriggerable = (Triggerable) triggerable;
+		timerService.startTimerService(getKeyedStateBackend().getKeySerializer(), namespaceSerializer, rawTriggerable);
 		return timerService;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
index 962f264..d507ba6 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.SimpleTimerService;
@@ -34,17 +33,13 @@ public class StreamTimelyFlatMap<K, IN, OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	private final TypeSerializer<K> keySerializer;
-
 	private transient TimestampedCollector<OUT> collector;
 
 	private transient TimerService timerService;
 
-	public StreamTimelyFlatMap(TypeSerializer<K> keySerializer, TimelyFlatMapFunction<IN, OUT> flatMapper) {
+	public StreamTimelyFlatMap(TimelyFlatMapFunction<IN, OUT> flatMapper) {
 		super(flatMapper);
 
-		this.keySerializer = keySerializer;
-
 		chainingStrategy = ChainingStrategy.ALWAYS;
 	}
 
@@ -54,7 +49,7 @@ public class StreamTimelyFlatMap<K, IN, OUT>
 		collector = new TimestampedCollector<>(output);
 
 		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
 
 		this.timerService = new SimpleTimerService(internalTimerService);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
index df2320f..212aafd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamTimelyFlatMap.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators.co;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.state.VoidNamespace;
 import org.apache.flink.runtime.state.VoidNamespaceSerializer;
 import org.apache.flink.streaming.api.SimpleTimerService;
@@ -40,18 +39,12 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
 
 	private static final long serialVersionUID = 1L;
 
-	private final TypeSerializer<K> keySerializer;
-
 	private transient TimestampedCollector<OUT> collector;
 
 	private transient TimerService timerService;
 
-	public CoStreamTimelyFlatMap(
-			TypeSerializer<K> keySerializer,
-			TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
+	public CoStreamTimelyFlatMap(TimelyCoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
-
-		this.keySerializer = keySerializer;
 	}
 
 	@Override
@@ -60,7 +53,7 @@ public class CoStreamTimelyFlatMap<K, IN1, IN2, OUT>
 		collector = new TimestampedCollector<>(output);
 
 		InternalTimerService<VoidNamespace> internalTimerService =
-				getInternalTimerService("user-timers", keySerializer, VoidNamespaceSerializer.INSTANCE, this);
+				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
 
 		this.timerService = new SimpleTimerService(internalTimerService);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index c465767..229d97d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -187,7 +187,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		timestampedCollector = new TimestampedCollector<>(output);
 
 		internalTimerService =
-				getInternalTimerService("window-timers", keySerializer, windowSerializer, this);
+				getInternalTimerService("window-timers", windowSerializer, this);
 
 		context = new Context(null, null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index fd05353..2fb0089 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -448,7 +448,6 @@ public class AbstractStreamOperatorTest {
 
 			this.timerService = getInternalTimerService(
 					"test-timers",
-					IntSerializer.INSTANCE,
 					VoidNamespaceSerializer.INSTANCE,
 					this);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
index f3b09eb..46b52ee 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/TimelyFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testCurrentEventTime() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
+				new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.EVENT_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -79,7 +79,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testCurrentProcessingTime() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
+				new StreamTimelyFlatMap<>(new QueryingFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -107,7 +107,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testEventTimeTimers() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
+				new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.EVENT_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -137,7 +137,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testProcessingTimeTimers() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, Integer> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
+				new StreamTimelyFlatMap<>(new TriggeringFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -166,7 +166,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testEventTimeTimerWithState() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
+				new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.EVENT_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -206,7 +206,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testProcessingTimeTimerWithState() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
+				new StreamTimelyFlatMap<>(new TriggeringStatefulFlatMapFunction(TimeDomain.PROCESSING_TIME));
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -239,7 +239,7 @@ public class TimelyFlatMapTest extends TestLogger {
 	public void testSnapshotAndRestore() throws Exception {
 
 		StreamTimelyFlatMap<Integer, Integer, String> operator =
-				new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+				new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
 
 		OneInputStreamOperatorTestHarness<Integer, String> testHarness =
 				new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
@@ -254,7 +254,7 @@ public class TimelyFlatMapTest extends TestLogger {
 
 		testHarness.close();
 
-		operator = new StreamTimelyFlatMap<>(IntSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+		operator = new StreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
 
 		testHarness = new KeyedOneInputStreamOperatorTestHarness<>(operator, new IdentityKeySelector<Integer>(), BasicTypeInfo.INT_TYPE_INFO);
 
@@ -352,7 +352,7 @@ public class TimelyFlatMapTest extends TestLogger {
 		private static final long serialVersionUID = 1L;
 
 		private final ValueStateDescriptor<Integer> state =
-				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE, null);
+				new ValueStateDescriptor<>("seen-element", IntSerializer.INSTANCE,  null);
 
 		private final TimeDomain timeDomain;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0b873ac3/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
index 25808f4..cb5d6c2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/TimelyCoFlatMapTest.java
@@ -49,7 +49,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testCurrentEventTime() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new WatermarkQueryingFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new WatermarkQueryingFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -85,7 +85,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testCurrentProcessingTime() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeQueryingFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new ProcessingTimeQueryingFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -117,7 +117,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testEventTimeTimers() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new EventTimeTriggeringFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -156,7 +156,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testProcessingTimeTimers() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -193,7 +193,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testEventTimeTimerWithState() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new EventTimeTriggeringStatefulFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new EventTimeTriggeringStatefulFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -242,7 +242,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testProcessingTimeTimerWithState() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new ProcessingTimeTriggeringStatefulFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new ProcessingTimeTriggeringStatefulFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -279,7 +279,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 	public void testSnapshotAndRestore() throws Exception {
 
 		CoStreamTimelyFlatMap<String, Integer, String, String> operator =
-				new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+				new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
 
 		TwoInputStreamOperatorTestHarness<Integer, String, String> testHarness =
 				new KeyedTwoInputStreamOperatorTestHarness<>(
@@ -299,7 +299,7 @@ public class TimelyCoFlatMapTest extends TestLogger {
 
 		testHarness.close();
 
-		operator = new CoStreamTimelyFlatMap<>(StringSerializer.INSTANCE, new BothTriggeringFlatMapFunction());
+		operator = new CoStreamTimelyFlatMap<>(new BothTriggeringFlatMapFunction());
 
 		testHarness = new KeyedTwoInputStreamOperatorTestHarness<>(
 				operator,


[2/6] flink git commit: [FLINK-4955] Add Translations Tests for KeyedStream.flatMap(TimelyFlatMapFunction)

Posted by al...@apache.org.
[FLINK-4955] Add Translations Tests for KeyedStream.flatMap(TimelyFlatMapFunction)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06fb9f1b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06fb9f1b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06fb9f1b

Branch: refs/heads/master
Commit: 06fb9f1b4f97ade67a23cd3adc8212e7d848de48
Parents: f0ef370
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 13:58:29 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/DataStreamTest.java     | 50 ++++++++++++++++++++
 .../streaming/api/scala/DataStreamTest.scala    | 28 ++++++++++-
 2 files changed, 77 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 17bea68..5e43120 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -46,6 +47,7 @@ import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamTimelyFlatMap;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.triggers.CountTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
@@ -544,6 +546,45 @@ public class DataStreamTest {
 		assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
 	}
 
+	/**
+	 * Verify that a timely flat map call is correctly translated to an operator.
+	 */
+	@Test
+	public void testTimelyFlatMapTranslation() {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		DataStreamSource<Long> src = env.generateSequence(0, 0);
+
+		TimelyFlatMapFunction<Long, Integer> timelyFlatMapFunction = new TimelyFlatMapFunction<Long, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public void flatMap(
+					Long value,
+					TimerService timerService,
+					Collector<Integer> out) throws Exception {
+
+			}
+
+			@Override
+			public void onTimer(
+					long timestamp,
+					TimeDomain timeDomain,
+					TimerService timerService,
+					Collector<Integer> out) throws Exception {
+
+			}
+		};
+
+		DataStream<Integer> flatMapped = src
+				.keyBy(new IdentityKeySelector<Long>())
+				.flatMap(timelyFlatMapFunction);
+
+		flatMapped.addSink(new DiscardingSink<Integer>());
+
+		assertEquals(timelyFlatMapFunction, getFunctionForDataStream(flatMapped));
+		assertTrue(getOperatorForDataStream(flatMapped) instanceof StreamTimelyFlatMap);
+	}
+
 	@Test
 	public void operatorTest() {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -809,6 +850,15 @@ public class DataStreamTest {
 		}
 	}
 
+	private static class IdentityKeySelector<T> implements KeySelector<T, T> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public T getKey(T value) throws Exception {
+			return value;
+		}
+	}
+
 	public static class CustomPOJO {
 		private String s;
 		private int i;

http://git-wip-us.apache.org/repos/asf/flink/blob/06fb9f1b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index b73eae8..de8b388 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -22,10 +22,12 @@ import java.lang
 
 import org.apache.flink.api.common.functions._
 import org.apache.flink.api.java.typeutils.TypeExtractor
+import org.apache.flink.streaming.api.{TimeDomain, TimerService}
 import org.apache.flink.streaming.api.collector.selector.OutputSelector
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
 import org.apache.flink.streaming.api.functions.co.CoMapFunction
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
-import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator}
+import org.apache.flink.streaming.api.operators.{AbstractUdfStreamOperator, StreamOperator, StreamTimelyFlatMap}
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows
 import org.apache.flink.streaming.api.windowing.triggers.{CountTrigger, PurgingTrigger}
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow
@@ -315,6 +317,30 @@ class DataStreamTest extends StreamingMultipleProgramsTestBase {
     // TODO check for custom case class
   }
 
+  /**
+   * Verify that a timely flat map call is correctly translated to an operator.
+   */
+  @Test
+  def testTimelyFlatMapTranslation(): Unit = {
+    val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+    val src = env.generateSequence(0, 0)
+
+    val timelyFlatMapFunction = new TimelyFlatMapFunction[Long, Int] {
+      override def flatMap(value: Long, timerService: TimerService, out: Collector[Int]): Unit = ???
+      override def onTimer(
+          timestamp: Long,
+          timeDomain: TimeDomain,
+          timerService: TimerService,
+          out: Collector[Int]): Unit = ???
+    }
+
+    val flatMapped = src.keyBy(x => x).flatMap(timelyFlatMapFunction)
+
+    assert(timelyFlatMapFunction == getFunctionForDataStream(flatMapped))
+    assert(getOperatorForDataStream(flatMapped).isInstanceOf[StreamTimelyFlatMap[_, _, _]])
+  }
+
   @Test def operatorTest() {
     val env = StreamExecutionEnvironment.getExecutionEnvironment