You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2019/11/08 08:24:21 UTC

[flink] 01/02: [FLINK-14380][ScalaAPI] Passing inferred outputType directly to map and flatMap function

This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 47c277d85aca6ce288505b13cd3a3595911e6bfa
Author: Marco Zühlke <mz...@apache.org>
AuthorDate: Sat Oct 26 00:00:21 2019 +0200

    [FLINK-14380][ScalaAPI] Passing inferred outputType directly to map and flatMap function
    
    This closes #9999
---
 .../streaming/api/datastream/ConnectedStreams.java | 37 +++++++++++++++++-
 .../flink/streaming/api/datastream/DataStream.java | 45 +++++++++++++++++++++-
 .../streaming/api/scala/ConnectedStreams.scala     |  4 +-
 .../flink/streaming/api/scala/DataStream.scala     |  4 +-
 4 files changed, 82 insertions(+), 8 deletions(-)

diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
index d4a34c9..6060a29 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedStreams.java
@@ -239,8 +239,22 @@ public class ConnectedStreams<IN1, IN2> {
 			Utils.getCallLocationName(),
 			true);
 
-		return transform("Co-Map", outTypeInfo, new CoStreamMap<>(inputStream1.clean(coMapper)));
+		return map(coMapper, outTypeInfo);
+	}
 
+	/**
+	 * Applies a CoMap transformation on a {@link ConnectedStreams} and maps
+	 * the output to a common type. The transformation calls a
+	 * {@link CoMapFunction#map1} for each element of the first input and
+	 * {@link CoMapFunction#map2} for each element of the second input. Each
+	 * CoMapFunction call returns exactly one element.
+	 *
+	 * @param coMapper The CoMapFunction used to jointly transform the two input DataStreams
+	 * @param outputType {@link TypeInformation} for the result type of the function.
+	 * @return The transformed {@link DataStream}
+	 */
+	public <R> SingleOutputStreamOperator<R> map(CoMapFunction<IN1, IN2, R> coMapper, TypeInformation<R> outputType) {
+		return transform("Co-Map", outputType, new CoStreamMap<>(inputStream1.clean(coMapper)));
 	}
 
 	/**
@@ -271,7 +285,26 @@ public class ConnectedStreams<IN1, IN2> {
 			Utils.getCallLocationName(),
 			true);
 
-		return transform("Co-Flat Map", outTypeInfo, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
+		return flatMap(coFlatMapper, outTypeInfo);
+	}
+
+	/**
+	 * Applies a CoFlatMap transformation on a {@link ConnectedStreams} and
+	 * maps the output to a common type. The transformation calls a
+	 * {@link CoFlatMapFunction#flatMap1} for each element of the first input
+	 * and {@link CoFlatMapFunction#flatMap2} for each element of the second
+	 * input. Each CoFlatMapFunction call returns any number of elements
+	 * including none.
+	 *
+	 * @param coFlatMapper
+	 *            The CoFlatMapFunction used to jointly transform the two input
+	 *            DataStreams
+	 * @param outputType {@link TypeInformation} for the result type of the function.
+	 *
+	 * @return The transformed {@link DataStream}
+	 */
+	public <R> SingleOutputStreamOperator<R> flatMap(CoFlatMapFunction<IN1, IN2, R> coFlatMapper, TypeInformation<R> outputType) {
+		return transform("Co-Flat Map", outputType, new CoStreamFlatMap<>(inputStream1.clean(coFlatMapper)));
 	}
 
 	/**
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index bd12cb2..b7bea55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -590,7 +590,27 @@ public class DataStream<T> {
 		TypeInformation<R> outType = TypeExtractor.getMapReturnTypes(clean(mapper), getType(),
 				Utils.getCallLocationName(), true);
 
-		return transform("Map", outType, new StreamMap<>(clean(mapper)));
+		return map(mapper, outType);
+	}
+
+	/**
+	 * Applies a Map transformation on a {@link DataStream}. The transformation
+	 * calls a {@link MapFunction} for each element of the DataStream. Each
+	 * MapFunction call returns exactly one element. The user can also extend
+	 * {@link RichMapFunction} to gain access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param mapper
+	 *            The MapFunction that is called for each element of the
+	 *            DataStream.
+	 * @param outputType {@link TypeInformation} for the result type of the function.
+	 *
+	 * @param <R>
+	 *            output type
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
+		return transform("Map", outputType, new StreamMap<>(clean(mapper)));
 	}
 
 	/**
@@ -614,7 +634,28 @@ public class DataStream<T> {
 		TypeInformation<R> outType = TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
 				getType(), Utils.getCallLocationName(), true);
 
-		return transform("Flat Map", outType, new StreamFlatMap<>(clean(flatMapper)));
+		return flatMap(flatMapper, outType);
+	}
+
+	/**
+	 * Applies a FlatMap transformation on a {@link DataStream}. The
+	 * transformation calls a {@link FlatMapFunction} for each element of the
+	 * DataStream. Each FlatMapFunction call can return any number of elements
+	 * including none. The user can also extend {@link RichFlatMapFunction} to
+	 * gain access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 *
+	 * @param flatMapper
+	 *            The FlatMapFunction that is called for each element of the
+	 *            DataStream
+	 * @param outputType {@link TypeInformation} for the result type of the function.
+	 *
+	 * @param <R>
+	 *            output type
+	 * @return The transformed {@link DataStream}.
+	 */
+	public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
+		return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
 
 	}
 
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 68514b7..caaf22d 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -97,7 +97,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]    
-    asScalaStream(javaStream.map(coMapper).returns(outType).asInstanceOf[JavaStream[R]])
+    asScalaStream(javaStream.map(coMapper, outType).asInstanceOf[JavaStream[R]])
   }
 
   /**
@@ -176,7 +176,7 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(javaStream.flatMap(coFlatMapper).returns(outType).asInstanceOf[JavaStream[R]])
+    asScalaStream(javaStream.flatMap(coFlatMapper, outType).asInstanceOf[JavaStream[R]])
   }
 
   /**
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 15dca2c..c2ca15a 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -631,7 +631,7 @@ class DataStream[T](stream: JavaStream[T]) {
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(stream.map(mapper).returns(outType).asInstanceOf[JavaStream[R]])
+    asScalaStream(stream.map(mapper, outType).asInstanceOf[JavaStream[R]])
   }
 
   /**
@@ -644,7 +644,7 @@ class DataStream[T](stream: JavaStream[T]) {
     }
 
     val outType : TypeInformation[R] = implicitly[TypeInformation[R]]
-    asScalaStream(stream.flatMap(flatMapper).returns(outType).asInstanceOf[JavaStream[R]])
+    asScalaStream(stream.flatMap(flatMapper, outType).asInstanceOf[JavaStream[R]])
   }
 
   /**