You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/07 23:00:24 UTC
flink git commit: [hotfix] Add ResultTypeQueryable to Keys in Stream
CoGroup/Join
Repository: flink
Updated Branches:
refs/heads/master f2186a604 -> 5c2c112b2
[hotfix] Add ResultTypeQueryable to Keys in Stream CoGroup/Join
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5c2c112b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5c2c112b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5c2c112b
Branch: refs/heads/master
Commit: 5c2c112b2b2d3a8f14d7cd82da940d11feb8e097
Parents: f2186a6
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Wed Oct 7 22:59:08 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Wed Oct 7 22:59:56 2015 +0200
----------------------------------------------------------------------
.../streaming/api/scala/CoGroupedStreams.scala | 33 +++++++++++++-------
.../streaming/api/scala/JoinedStreams.scala | 32 ++++++++++++-------
2 files changed, 42 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5c2c112b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
index 1b16e44..0164b92 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CoGroupedStreams.scala
@@ -21,7 +21,7 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.functions.CoGroupFunction
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
-import org.apache.flink.api.scala._
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{CoGroupedStreams => JavaCoGroupedStreams}
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.evictors.Evictor
@@ -69,23 +69,27 @@ object CoGroupedStreams {
/**
* Specifies a [[KeySelector]] for elements from the first input.
*/
- def where[KEY](keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+ def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
val cleanFun = clean(keySelector)
- val javaSelector = new KeySelector[T1, KEY] {
+ val keyType = implicitly[TypeInformation[KEY]]
+ val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T1) = cleanFun(in)
+ override def getProducedType: TypeInformation[KEY] = keyType
}
- new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null)
+ new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
}
/**
* Specifies a [[KeySelector]] for elements from the second input.
*/
- def equalTo[KEY](keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
+ def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = {
val cleanFun = clean(keySelector)
- val javaSelector = new KeySelector[T2, KEY] {
+ val keyType = implicitly[TypeInformation[KEY]]
+ val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T2) = cleanFun(in)
+ override def getProducedType: TypeInformation[KEY] = keyType
}
- new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector)
+ new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
}
/**
@@ -112,17 +116,20 @@ object CoGroupedStreams {
input1: DataStream[T1],
input2: DataStream[T2],
keySelector1: KeySelector[T1, KEY],
- keySelector2: KeySelector[T2, KEY]) {
+ keySelector2: KeySelector[T2, KEY],
+ keyType: TypeInformation[KEY]) {
/**
* Specifies a [[KeySelector]] for elements from the first input.
*/
def where(keySelector: T1 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
val cleanFun = clean(keySelector)
- val javaSelector = new KeySelector[T1, KEY] {
+ val localKeyType = keyType
+ val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T1) = cleanFun(in)
+ override def getProducedType: TypeInformation[KEY] = localKeyType
}
- new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2)
+ new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, keyType)
}
/**
@@ -130,10 +137,12 @@ object CoGroupedStreams {
*/
def equalTo(keySelector: T2 => KEY): CoGroupedStreams.WithKey[T1, T2, KEY] = {
val cleanFun = clean(keySelector)
- val javaSelector = new KeySelector[T2, KEY] {
+ val localKeyType = keyType
+ val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T2) = cleanFun(in)
+ override def getProducedType: TypeInformation[KEY] = localKeyType
}
- new CoGroupedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector)
+ new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, keyType)
}
/**
http://git-wip-us.apache.org/repos/asf/flink/blob/5c2c112b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
index be059b8..2fda32d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/JoinedStreams.scala
@@ -21,6 +21,7 @@ package org.apache.flink.streaming.api.scala
import org.apache.flink.api.common.functions.{FlatJoinFunction, JoinFunction}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable
import org.apache.flink.streaming.api.datastream.{JoinedStreams => JavaJoinedStreams, CoGroupedStreams => JavaCoGroupedStreams}
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner
import org.apache.flink.streaming.api.windowing.evictors.Evictor
@@ -66,23 +67,27 @@ object JoinedStreams {
/**
* Specifies a [[KeySelector]] for elements from the first input.
*/
- def where[KEY](keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+ def where[KEY: TypeInformation](keySelector: T1 => KEY): WithKey[T1, T2, KEY] = {
val cleanFun = clean(keySelector)
- val javaSelector = new KeySelector[T1, KEY] {
+ val keyType = implicitly[TypeInformation[KEY]]
+ val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T1) = cleanFun(in)
+ override def getProducedType: TypeInformation[KEY] = keyType
}
- new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, null)
+ new WithKey[T1, T2, KEY](input1, input2, javaSelector, null, keyType)
}
/**
* Specifies a [[KeySelector]] for elements from the second input.
*/
- def equalTo[KEY](keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
+ def equalTo[KEY: TypeInformation](keySelector: T2 => KEY): WithKey[T1, T2, KEY] = {
val cleanFun = clean(keySelector)
- val javaSelector = new KeySelector[T2, KEY] {
+ val keyType = implicitly[TypeInformation[KEY]]
+ val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T2) = cleanFun(in)
+ override def getProducedType: TypeInformation[KEY] = keyType
}
- new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, null, javaSelector)
+ new WithKey[T1, T2, KEY](input1, input2, null, javaSelector, keyType)
}
/**
@@ -109,17 +114,20 @@ object JoinedStreams {
input1: DataStream[T1],
input2: DataStream[T2],
keySelector1: KeySelector[T1, KEY],
- keySelector2: KeySelector[T2, KEY]) {
+ keySelector2: KeySelector[T2, KEY],
+ keyType: TypeInformation[KEY]) {
/**
* Specifies a [[KeySelector]] for elements from the first input.
*/
def where(keySelector: T1 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
val cleanFun = clean(keySelector)
- val javaSelector = new KeySelector[T1, KEY] {
+ val localKeyType = keyType
+ val javaSelector = new KeySelector[T1, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T1) = cleanFun(in)
+ override def getProducedType: TypeInformation[KEY] = localKeyType
}
- new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2)
+ new WithKey[T1, T2, KEY](input1, input2, javaSelector, keySelector2, localKeyType)
}
/**
@@ -127,10 +135,12 @@ object JoinedStreams {
*/
def equalTo(keySelector: T2 => KEY): JoinedStreams.WithKey[T1, T2, KEY] = {
val cleanFun = clean(keySelector)
- val javaSelector = new KeySelector[T2, KEY] {
+ val localKeyType = keyType
+ val javaSelector = new KeySelector[T2, KEY] with ResultTypeQueryable[KEY] {
def getKey(in: T2) = cleanFun(in)
+ override def getProducedType: TypeInformation[KEY] = localKeyType
}
- new JoinedStreams.WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector)
+ new WithKey[T1, T2, KEY](input1, input2, keySelector1, javaSelector, localKeyType)
}
/**