You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2016/11/07 15:26:34 UTC
[6/6] flink git commit: [FLINK-4952] [scala] Add
KeyedStream.flatMap(TimelyFlatMapFunction)
[FLINK-4952] [scala] Add KeyedStream.flatMap(TimelyFlatMapFunction)
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0ef3703
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0ef3703
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0ef3703
Branch: refs/heads/master
Commit: f0ef370399638689c2e1adc54a3acf0afab67a17
Parents: b9173b3
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 28 13:47:06 2016 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Nov 7 16:25:57 2016 +0100
----------------------------------------------------------------------
.../streaming/api/datastream/KeyedStream.java | 31 ++++++++++++++++--
.../flink/streaming/api/scala/KeyedStream.scala | 33 +++++++++++++++++++-
2 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
index 922ad20..c938f5b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/KeyedStream.java
@@ -202,11 +202,38 @@ public class KeyedStream<T, KEY> extends DataStream<T> {
Utils.getCallLocationName(),
true);
+ return flatMap(flatMapper, outType);
+ }
+
+ /**
+ * Applies the given {@link TimelyFlatMapFunction} on the input stream, thereby
+ * creating a transformed output stream.
+ *
+ * <p>The function will be called for every element in the stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * <p>A {@link org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction}
+ * can be used to gain access to features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param flatMapper The {@link TimelyFlatMapFunction} that is called for each element
+ * in the stream.
+ * @param outputType {@link TypeInformation} for the result type of the function.
+ *
+ * @param <R> The of elements emitted by the {@code TimelyFlatMapFunction}.
+ *
+ * @return The transformed {@link DataStream}.
+ */
+ @Internal
+ public <R> SingleOutputStreamOperator<R> flatMap(
+ TimelyFlatMapFunction<T, R> flatMapper,
+ TypeInformation<R> outputType) {
+
StreamTimelyFlatMap<KEY, T, R> operator =
new StreamTimelyFlatMap<>(keyType.createSerializer(getExecutionConfig()), clean(flatMapper));
- return transform("Flat Map", outType, operator);
-
+ return transform("Flat Map", outputType, operator);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/f0ef3703/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
----------------------------------------------------------------------
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
index 68eebea..1971359 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/KeyedStream.scala
@@ -23,7 +23,8 @@ import org.apache.flink.api.common.functions._
import org.apache.flink.api.common.state.{FoldingStateDescriptor, ListStateDescriptor, ReducingStateDescriptor, ValueStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.TypeSerializer
-import org.apache.flink.streaming.api.datastream.{DataStream => JavaStream, KeyedStream => KeyedJavaStream, QueryableStateStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.datastream.{QueryableStateStream, SingleOutputStreamOperator, DataStream => JavaStream, KeyedStream => KeyedJavaStream, WindowedStream => WindowedJavaStream}
+import org.apache.flink.streaming.api.functions.TimelyFlatMapFunction
import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType
import org.apache.flink.streaming.api.functions.aggregation.{ComparableAggregator, SumAggregator}
import org.apache.flink.streaming.api.functions.query.{QueryableAppendingStateOperator, QueryableValueStateOperator}
@@ -46,6 +47,36 @@ class KeyedStream[T, K](javaStream: KeyedJavaStream[T, K]) extends DataStream[T]
*/
@Internal
def getKeyType = javaStream.getKeyType()
+
+
+ // ------------------------------------------------------------------------
+ // basic transformations
+ // ------------------------------------------------------------------------
+
+ /**
+ * Applies the given [[TimelyFlatMapFunction]] on the input stream, thereby
+ * creating a transformed output stream.
+ *
+ * The function will be called for every element in the stream and can produce
+ * zero or more output. The function can also query the time and set timers. When
+ * reacting to the firing of set timers the function can emit yet more elements.
+ *
+ * A [[org.apache.flink.streaming.api.functions.RichTimelyFlatMapFunction]]
+ * can be used to gain access to features provided by the
+ * [[org.apache.flink.api.common.functions.RichFunction]]
+ *
+ * @param flatMapper The [[TimelyFlatMapFunction]] that is called for each element
+ * in the stream.
+ */
+ def flatMap[R: TypeInformation](
+ flatMapper: TimelyFlatMapFunction[T, R]): DataStream[R] = {
+
+ if (flatMapper == null) {
+ throw new NullPointerException("TimelyFlatMapFunction must not be null.")
+ }
+
+ asScalaStream(javaStream.flatMap(flatMapper, implicitly[TypeInformation[R]]))
+ }
// ------------------------------------------------------------------------
// Windowing