You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 23:00:24 UTC

flink git commit: [hotfix] Add ResultTypeQueryable to Keys in Stream CoGroup/Join

Repository: flink
Updated Branches:
  refs/heads/master f2186a604 -> 5c2c112b2


[hotfix] Add ResultTypeQueryable to Keys in Stream CoGroup/Join


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c2c112b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c2c112b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c2c112b

Branch: refs/heads/master
Commit: 5c2c112b2b2d3a8f14d7cd82da940d11feb8e097
Parents: f2186a6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 7 22:59:08 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:59:56 2015 +0200

----------------------------------------------------------------------
 .../streaming/api/scala/CoGroupedStreams.scala  | 33 +++++++++++++-------
 .../streaming/api/scala/JoinedStreams.scala     | 32 ++++++++++++-------
 2 files changed, 42 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5c2c112b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 1b16e44..0164b92 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.common.functions.CoGroupFunction
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala._
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
@@ -69,23 +69,27 @@ object CoGroupedStreams {
     /**
      * Specifies a [[KeySelector]] for elements from the first input.
      */
-    def where[KEY](keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+    def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T1, KEY] {
+      val keyType = implicitly[TypeInformation[KEY]]
+      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T1) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = keyType
       }
-      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null)
+      new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
     }
 
     /**
      * Specifies a [[KeySelector]] for elements from the second input.
      */
-    def equalTo[KEY](keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+    def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T2, KEY] {
+      val keyType = implicitly[TypeInformation[KEY]]
+      val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = keyType
       }
-      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector)
+      new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
     }
 
     /**
@@ -112,17 +116,20 @@ object CoGroupedStreams {
       input1: DataStream[T1],
       input2: DataStream[T2],
       keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY]) {
+      keySelector2: KeySelector[T2, KEY],
+      keyType: TypeInformation[KEY]) {
 
     /**
      * Specifies a [[KeySelector]] for elements from the first input.
      */
     def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T1, KEY] {
+      val localKeyType = keyType
+      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T1) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2)
+      new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, keyType)
     }
 
     /**
@@ -130,10 +137,12 @@ object CoGroupedStreams {
      */
     def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T2, KEY] {
+      val localKeyType = keyType
+      val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector)
+      new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, keyType)
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/flink/blob/5c2c112b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index be059b8..2fda32d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala
 import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
 import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams}
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
 import org.apache.flink.streaming.api.windowing.evictors.Evictor
@@ -66,23 +67,27 @@ object JoinedStreams {
     /**
      * Specifies a [[KeySelector]] for elements from the first input.
      */
-    def where[KEY](keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+    def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T1, KEY] {
+      val keyType = implicitly[TypeInformation[KEY]]
+      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T1) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = keyType
       }
-      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null)
+      new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
     }
 
     /**
      * Specifies a [[KeySelector]] for elements from the second input.
      */
-    def equalTo[KEY](keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+    def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T2, KEY] {
+      val keyType = implicitly[TypeInformation[KEY]]
+      val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = keyType
       }
-      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector)
+      new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
     }
 
     /**
@@ -109,17 +114,20 @@ object JoinedStreams {
       input1: DataStream[T1],
       input2: DataStream[T2],
       keySelector1: KeySelector[T1, KEY],
-      keySelector2: KeySelector[T2, KEY]) {
+      keySelector2: KeySelector[T2, KEY],
+      keyType: TypeInformation[KEY]) {
 
     /**
      * Specifies a [[KeySelector]] for elements from the first input.
      */
     def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T1, KEY] {
+      val localKeyType = keyType
+      val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T1) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2)
+      new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, localKeyType)
     }
 
     /**
@@ -127,10 +135,12 @@ object JoinedStreams {
      */
     def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
       val cleanFun = clean(keySelector)
-      val javaSelector = new KeySelector[T2, KEY] {
+      val localKeyType = keyType
+      val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
         def getKey(in: T2) = cleanFun(in)
+        override def getProducedType: TypeInformation[KEY] = localKeyType
       }
-      new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector)
+      new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, localKeyType)
     }
 
     /**