You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/07 15:26:34 UTC

[6/6] flink git commit: [FLINK-4952] [scala] Add KeyedStream.flatMap(TimelyFlatMapFunction)

[FLINK-4952] [scala] Add KeyedStream.flatMap(TimelyFlatMapFunction)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0ef3703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0ef3703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0ef3703

Branch: refs/heads/master
Commit: f0ef370399638689c2e1adc54a3acf0afab67a17
Parents: b9173b3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 13:47:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/KeyedStream.java   | 31 ++++++++++++++++--
 .../flink/streaming/api/scala/KeyedStream.scala | 33 +++++++++++++++++++-
 2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 922ad20..c938f5b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -202,11 +202,38 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
 				Utils.getCallLocationName(),
 				true);
 
+		return flatMap(flatMapper, outType);
+	}
+
+	/**
+	 * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+	 * creating a transformed output stream.
+	 *
+	 * <p>The function will be called for every element in the stream and can produce
+	 * zero or more output. The function can also query the time and set timers. When
+	 * reacting to the firing of set timers the function can emit yet more elements.
+	 *
+	 * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+	 * can be used to gain access to features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+	 *                      in the stream.
+	 * @param outputType {@link TypeInformation} for the result type of the function.
+	 *
+	 * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+	 *
+	 * @return The transformed {@link DataStream}.
+	 */
+	@Internal
+	public <R> SingleOutputStreamOperator<R> flatMap(
+			TimelyFlatMapFunction<T, R> flatMapper,
+			TypeInformation<R> outputType) {
+
 		StreamTimelyFlatMap<KEY, T, R> operator =
 				new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
 
-		return transform("Flat Map", outType, operator);
-
+		return transform("Flat Map", outputType, operator);
 	}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 68eebea..1971359 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.common.functions._
 import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, QueryableStateStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
 import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
 import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
 import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -46,6 +47,36 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
    */
   @Internal
   def getKeyType = javaStream.getKeyType()
+
+
+  // ------------------------------------------------------------------------
+  //  basic transformations
+  // ------------------------------------------------------------------------
+
+  /**
+    * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby
+    * creating a transformed output stream.
+    *
+    * The function will be called for every element in the stream and can produce
+    * zero or more output. The function can also query the time and set timers. When
+    * reacting to the firing of set timers the function can emit yet more elements.
+    *
+    * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]]
+    * can be used to gain access to features provided by the
+    * [[org.apache.flink.api.common.functions.RichFunction]]
+    *
+    * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element
+    *                   in the stream.
+    */
+  def flatMap[R: TypeInformation](
+      flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = {
+
+    if (flatMapper == null) {
+      throw new NullPointerException("TimelyFlatMapFunction must not be null.")
+    }
+
+    asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]]))
+  }
   
   // ------------------------------------------------------------------------
   //  Windowing