You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Till Rohrmann (JIRA)" <ji...@apache.org> on 2018/11/28 17:41:00 UTC
[jira] [Reopened] (FLINK-10763) Interval join produces wrong result
type in Scala API
[ https://issues.apache.org/jira/browse/FLINK-10763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Till Rohrmann reopened FLINK-10763:
-----------------------------------
> Interval join produces wrong result type in Scala API
> -----------------------------------------------------
>
> Key: FLINK-10763
> URL: https://issues.apache.org/jira/browse/FLINK-10763
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Affects Versions: 1.6.2
> Reporter: wangjinhai
> Assignee: Timo Walther
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.6.3, 1.7.0, 1.8.0
>
>
> When stream is a Scala case class, the TypeInformation will fall back to GenericType in the process function which result in bad performance when union another DataStream.
> In the union method of DataStream, the type is first checked for equality.
> Here is an example:
> {code:java}
> object Test {
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> val orderA: DataStream[Order] = env.fromCollection(Seq(
> Order(1L, "beer", 3),
> Order(1L, "diaper", 4),
> Order(3L, "rubber", 2)))
> val orderB: DataStream[Order] = env.fromCollection(Seq(
> new Order(2L, "pen", 3),
> new Order(2L, "rubber", 3),
> new Order(4L, "beer", 1)))
> val orderC: DataStream[Order] = orderA.keyBy(_.user)
> .intervalJoin(orderB.keyBy(_.user))
> .between(Time.seconds(0), Time.seconds(0))
> .process(new ProcessJoinFunction[Order, Order, Order] {
> override def processElement(left: Order, right: Order, ctx: ProcessJoinFunction[Order, Order, Order]#Context, out: Collector[Order]): Unit = {
> out.collect(left)
> }})
> println("C: " + orderC.dataType.toString)
> println("B: " + orderB.dataType.toString)
> orderC.union(orderB).print()
> env.execute()
> }
> case class Order(user: Long, product: String, amount: Int)
> }{code}
> Here is the Exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<com.manbuyun.awesome.flink.Test.Order> and com.manbuyun.awesome.flink.Test$Order(user: Long, product: String, amount: Integer)
> at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:219)
> at org.apache.flink.streaming.api.scala.DataStream.union(DataStream.scala:357)
> at com.manbuyun.awesome.flink.Test$.main(Test.scala:38)
> at com.manbuyun.awesome.flink.Test.main(Test.scala){code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)