You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2018/11/20 10:37:00 UTC

[jira] [Closed] (FLINK-10763) Interval join produces wrong result type in Scala API

     [ https://issues.apache.org/jira/browse/FLINK-10763?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timo Walther closed FLINK-10763.
--------------------------------
    Resolution: Fixed

Fixed in master: f629b05a2cdc2c07f4a19456cf5b3e5fdd6ff607
Fixed in 1.7.1: 43aa594e9f0787e9e1c5fcddaa60e845a748a237
Fixed in 1.6.3: febd1c6f534a4108a0eaf6fa6847e2d01e0b0712

> Interval join produces wrong result type in Scala API
> -----------------------------------------------------
>
>                 Key: FLINK-10763
>                 URL: https://issues.apache.org/jira/browse/FLINK-10763
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.6.2
>            Reporter: wangjinhai
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.3, 1.8.0, 1.7.1
>
>
> When stream is a Scala case class, the TypeInformation will fall back to GenericType in the process function which result in bad performance when union another DataStream.
> In the union method of DataStream, the type is first checked for equality.
> Here is an example:
> {code:java}
> object Test {
>     def main(args: Array[String]): Unit = {
>       val env = StreamExecutionEnvironment.getExecutionEnvironment
>       val orderA: DataStream[Order] = env.fromCollection(Seq(
>         Order(1L, "beer", 3),
>          Order(1L, "diaper", 4),
>          Order(3L, "rubber", 2)))
>       val orderB: DataStream[Order] = env.fromCollection(Seq(
>         new Order(2L, "pen", 3),
>         new Order(2L, "rubber", 3),
>         new Order(4L, "beer", 1)))
>       val orderC: DataStream[Order] = orderA.keyBy(_.user)
>         .intervalJoin(orderB.keyBy(_.user))
>         .between(Time.seconds(0), Time.seconds(0))
>         .process(new ProcessJoinFunction[Order, Order, Order] {
>           override def processElement(left: Order, right: Order, ctx: ProcessJoinFunction[Order, Order, Order]#Context, out: Collector[Order]): Unit = {
>             out.collect(left)
>           }})
>       println("C: " + orderC.dataType.toString)
>       println("B: " + orderB.dataType.toString)
>       orderC.union(orderB).print()
>       env.execute()
>     }
>     case class Order(user: Long, product: String, amount: Int)
> }{code}
> Here is the Exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<com.manbuyun.awesome.flink.Test.Order> and com.manbuyun.awesome.flink.Test$Order(user: Long, product: String, amount: Integer)
>  at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:219)
>  at org.apache.flink.streaming.api.scala.DataStream.union(DataStream.scala:357)
>  at com.manbuyun.awesome.flink.Test$.main(Test.scala:38)
>  at com.manbuyun.awesome.flink.Test.main(Test.scala){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)