You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "wangjinhai (JIRA)" <ji...@apache.org> on 2018/11/02 14:51:00 UTC
[jira] [Created] (FLINK-10763) Cannot union streams of different
types in the union method of DataStream class
wangjinhai created FLINK-10763:
----------------------------------
Summary: Cannot union streams of different types in the union method of DataStream class
Key: FLINK-10763
URL: https://issues.apache.org/jira/browse/FLINK-10763
Project: Flink
Issue Type: Bug
Components: DataStream API
Affects Versions: 1.6.2, 1.5.5, 1.4.2, 1.3.3
Reporter: wangjinhai
Fix For: 1.5.6, 1.6.3, 1.7.0
When stream is a Scala case class, the TypeInformation will fall back to GenericType in the process function which result in bad performance when union another DataStream.
In the union method of DataStream, the type is first checked for equality.
Here is an example:
{code:java}
object Test {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val orderA: DataStream[Order] = env.fromCollection(Seq(
Order(1L, "beer", 3),
Order(1L, "diaper", 4),
Order(3L, "rubber", 2)))
val orderB: DataStream[Order] = env.fromCollection(Seq(
new Order(2L, "pen", 3),
new Order(2L, "rubber", 3),
new Order(4L, "beer", 1)))
val orderC: DataStream[Order] = orderA.keyBy(_.user)
.intervalJoin(orderB.keyBy(_.user))
.between(Time.seconds(0), Time.seconds(0))
.process(new ProcessJoinFunction[Order, Order, Order] {
override def processElement(left: Order, right: Order, ctx: ProcessJoinFunction[Order, Order, Order]#Context, out: Collector[Order]): Unit = {
out.collect(left)
}})
println("C: " + orderC.dataType.toString)
println("B: " + orderB.dataType.toString)
orderC.union(orderB).print()
env.execute()
}
case class Order(user: Long, product: String, amount: Int)
}{code}
Here is the Exception:
{code:java}
Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<com.manbuyun.awesome.flink.Test.Order> and com.manbuyun.awesome.flink.Test$Order(user: Long, product: String, amount: Integer)
at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:219)
at org.apache.flink.streaming.api.scala.DataStream.union(DataStream.scala:357)
at com.manbuyun.awesome.flink.Test$.main(Test.scala:38)
at com.manbuyun.awesome.flink.Test.main(Test.scala){code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)