You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/08 08:24:21 UTC
[flink] 01/02: [FLINK-14380][ScalaAPI] Passing inferred outputType
directly to map and flatMap function
This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 47c277d85aca6ce288505b13cd3a3595911e6bfa
Author: Marco Zühlke <mz...@apache.org>
AuthorDate: Sat Oct 26 00:00:21 2019 +0200
[FLINK-14380][ScalaAPI] Passing inferred outputType directly to map and flatMap function
This closes #9999
---
.../streaming/api/datastream/ConnectedStreams.java | 37 +++++++++++++++++-
.../flink/streaming/api/datastream/DataStream.java | 45 +++++++++++++++++++++-
.../streaming/api/scala/ConnectedStreams.scala | 4 +-
.../flink/streaming/api/scala/DataStream.scala | 4 +-
4 files changed, 82 insertions(+), 8 deletions(-)
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index d4a34c9..6060a29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -239,8 +239,22 @@ public class ConnectedStreams<IN1, IN2> {
Utils.getCallLocationName(),
true);
- return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));
+ return map(coMapper, outTypeInfo);
+ }
+ /**
+ * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+ * the output to a common type. The transformation calls a
+ * {@link CoMapFunction#map1} for each element of the first input and
+ * {@link CoMapFunction#map2} for each element of the second input. Each
+ * CoMapFunction call returns exactly one element.
+ *
+ * @param coMapper The CoMapFunction used to jointly transform the two input DataStreams
+ * @param outputType {@link TypeInformation} for the result type of the function.
+ * @return The transformed {@link DataStream}
+ */
+ public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper, TypeInformation<R> outputType) {
+ return transform("Co-Map", outputType, new CoStreamMap<>(inputStream1.clean(coMapper)));
}
/**
@@ -271,7 +285,26 @@ public class ConnectedStreams<IN1, IN2> {
Utils.getCallLocationName(),
true);
- return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
+ return flatMap(coFlatMapper, outTypeInfo);
+ }
+
+ /**
+ * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+ * maps the output to a common type. The transformation calls a
+ * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+ * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+ * input. Each CoFlatMapFunction call returns any number of elements
+ * including none.
+ *
+ * @param coFlatMapper
+ * The CoFlatMapFunction used to jointly transform the two input
+ * DataStreams
+ * @param outputType {@link TypeInformation} for the result type of the function.
+ *
+ * @return The transformed {@link DataStream}
+ */
+ public <R> SingleOutputStreamOperator<R> flatMap(CoFlatMapFunction<IN1, IN2, R> coFlatMapper, TypeInformation<R> outputType) {
+ return transform("Co-Flat Map", outputType, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
}
/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bd12cb2..b7bea55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -590,7 +590,27 @@ public class DataStream<T> {
TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
Utils.getCallLocationName(), true);
- return transform("Map", outType, new StreamMap<>(clean(mapper)));
+ return map(mapper, outType);
+ }
+
+ /**
+ * Applies a Map transformation on a {@link DataStream}. The transformation
+ * calls a {@link MapFunction} for each element of the DataStream. Each
+ * MapFunction call returns exactly one element. The user can also extend
+ * {@link RichMapFunction} to gain access to other features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param mapper
+ * The MapFunction that is called for each element of the
+ * DataStream.
+ * @param outputType {@link TypeInformation} for the result type of the function.
+ *
+ * @param <R>
+ * output type
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
+ return transform("Map", outputType, new StreamMap<>(clean(mapper)));
}
/**
@@ -614,7 +634,28 @@ public class DataStream<T> {
TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
- return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
+ return flatMap(flatMapper, outType);
+ }
+
+ /**
+ * Applies a FlatMap transformation on a {@link DataStream}. The
+ * transformation calls a {@link FlatMapFunction} for each element of the
+ * DataStream. Each FlatMapFunction call can return any number of elements
+ * including none. The user can also extend {@link RichFlatMapFunction} to
+ * gain access to other features provided by the
+ * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+ *
+ * @param flatMapper
+ * The FlatMapFunction that is called for each element of the
+ * DataStream
+ * @param outputType {@link TypeInformation} for the result type of the function.
+ *
+ * @param <R>
+ * output type
+ * @return The transformed {@link DataStream}.
+ */
+ public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
+ return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
}
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 68514b7..caaf22d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -97,7 +97,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- asScalaStream(javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]])
+ asScalaStream(javaStream.map(coMapper, outType).asInstanceOf[JavaStream[R]])
}
/**
@@ -176,7 +176,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]])
+ asScalaStream(javaStream.flatMap(coFlatMapper, outType).asInstanceOf[JavaStream[R]])
}
/**
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 15dca2c..c2ca15a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -631,7 +631,7 @@ class DataStream[T](stream: JavaStream[T]) {
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- asScalaStream(stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]])
+ asScalaStream(stream.map(mapper, outType).asInstanceOf[JavaStream[R]])
}
/**
@@ -644,7 +644,7 @@ class DataStream[T](stream: JavaStream[T]) {
}
val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
- asScalaStream(stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]])
+ asScalaStream(stream.flatMap(flatMapper, outType).asInstanceOf[JavaStream[R]])
}
/**