You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/10/16 18:08:49 UTC

[22/24] flink git commit: [hotfix] [streaming scala] Expose key type information for key selectors on connected data streams

[hotfix] [streaming scala] Expose key type information for key selectors on connected data streams


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb1f5fd5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb1f5fd5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb1f5fd5

Branch: refs/heads/master
Commit: bb1f5fd58c5a371941531740d573300ab020503b
Parents: 4ee5b4c
Author: Stephan Ewen <se...@apache.org>
Authored: Fri Oct 9 23:58:40 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 16 15:26:11 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/scala/ConnectedStreams.scala  | 30 ++++++++++++++------
 1 file changed, 21 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb1f5fd5/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
index 4727cc5..f7413b7 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/ConnectedStreams.scala
@@ -20,7 +20,8 @@ package org.apache.flink.streaming.api.scala
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream}
+import org.apache.flink.api.java.typeutils.{TypeExtractor, ResultTypeQueryable}
+import org.apache.flink.streaming.api.datastream.{ConnectedStreams => JavaCStream, DataStream => JavaStream, KeyedStream => JKeyedStream}
 import org.apache.flink.streaming.api.functions.co.{CoFlatMapFunction, CoMapFunction}
 import org.apache.flink.util.Collector
 import scala.reflect.ClassTag
@@ -238,18 +239,18 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
    * The function used for grouping the second input
    * @return The grouped { @link ConnectedStreams}
    */
-  def keyBy[K: TypeInformation, L: TypeInformation](fun1: IN1 => K, fun2: IN2 => L):
+  def keyBy[K1: TypeInformation, K2: TypeInformation](fun1: IN1 => K1, fun2: IN2 => K2):
   ConnectedStreams[IN1, IN2] = {
 
+    val keyType1 = implicitly[TypeInformation[K1]]
+    val keyType2 = implicitly[TypeInformation[K2]]
+    
     val cleanFun1 = clean(fun1)
     val cleanFun2 = clean(fun2)
-    val keyExtractor1 = new KeySelector[IN1, K] {
-      def getKey(in: IN1) = cleanFun1(in)
-    }
-    val keyExtractor2 = new KeySelector[IN2, L] {
-      def getKey(in: IN2) = cleanFun2(in)
-    }
-
+    
+    val keyExtractor1 = new KeySelectorWithType[IN1, K1](cleanFun1, keyType1)
+    val keyExtractor2 = new KeySelectorWithType[IN2, K2](cleanFun2, keyType2)
+    
     javaStream.keyBy(keyExtractor1, keyExtractor2)
   }
 
@@ -351,3 +352,14 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
   }
 
 }
+
+class KeySelectorWithType[IN, K](
+        private[this] val fun: IN => K,
+        private[this] val info: TypeInformation[K])
+  extends KeySelector[IN, K] with ResultTypeQueryable[K] {
+  
+  override def getKey(value: IN): K = fun(value)
+
+  override def getProducedType: TypeInformation[K] = info
+}
+  
\ No newline at end of file