You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:49 UTC
[22/24] flink git commit: [hotfix] [streaming scala] Expose key type
information for key selectors on connected data streams
[hotfix] [streaming scala] Expose key type information for key selectors on connected data streams
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb1f5fd5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb1f5fd5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb1f5fd5
Branch: refs/heads/master
Commit: bb1f5fd58c5a371941531740d573300ab020503b
Parents: 4ee5b4c
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 23:58:40 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:11 2015 +0200
----------------------------------------------------------------------
.../streaming/api/scala/ConnectedStreams.scala | 30 ++++++++++++++------
1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/bb1f5fd5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 4727cc5..f7413b7 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -20,7 +20,8 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
+import org.apache.flink.api.java.typeutils.{TypeExtractor, ResultTypeQueryable}
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, KeyedStream => JKeyedStream}
import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
import org.apache.flink.util.Collector
import scala.reflect.ClassTag
@@ -238,18 +239,18 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
* The function used for grouping the second input
* @return The grouped { @link ConnectedStreams}
*/
- def keyBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+ def keyBy[K1: TypeInformation, K2: TypeInformation](fun1: IN1 => K1, fun2: IN2 => K2):
ConnectedStreams[IN1, IN2] = {
+ val keyType1 = implicitly[TypeInformation[K1]]
+ val keyType2 = implicitly[TypeInformation[K2]]
+
val cleanFun1 = clean(fun1)
val cleanFun2 = clean(fun2)
- val keyExtractor1 = new KeySelector[IN1, K] {
- def getKey(in: IN1) = cleanFun1(in)
- }
- val keyExtractor2 = new KeySelector[IN2, L] {
- def getKey(in: IN2) = cleanFun2(in)
- }
-
+
+ val keyExtractor1 = new KeySelectorWithType[IN1, K1](cleanFun1, keyType1)
+ val keyExtractor2 = new KeySelectorWithType[IN2, K2](cleanFun2, keyType2)
+
javaStream.keyBy(keyExtractor1, keyExtractor2)
}
@@ -351,3 +352,14 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
}
}
+
+class KeySelectorWithType[IN, K](
+ private[this] val fun: IN => K,
+ private[this] val info: TypeInformation[K])
+ extends KeySelector[IN, K] with ResultTypeQueryable[K] {
+
+ override def getKey(value: IN): K = fun(value)
+
+ override def getProducedType: TypeInformation[K] = info
+}
+
\ No newline at end of file