You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2018/11/19 15:07:01 UTC

[jira] [Commented] (FLINK-10763) Interval join produces wrong result type in Scala API

    [ https://issues.apache.org/jira/browse/FLINK-10763?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16691812#comment-16691812 ] 

Timo Walther commented on FLINK-10763:
--------------------------------------

I looked into this issue. The union operator is not the problem here. Instead, the interval join produces a wrong result type because it relies on the Java instead of the Scala type extraction. 

> Interval join produces wrong result type in Scala API
> -----------------------------------------------------
>
>                 Key: FLINK-10763
>                 URL: https://issues.apache.org/jira/browse/FLINK-10763
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>    Affects Versions: 1.3.3, 1.4.2, 1.5.5, 1.6.2
>            Reporter: wangjinhai
>            Assignee: Timo Walther
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.5.6, 1.6.3, 1.8.0, 1.7.1
>
>
> When stream is a Scala case class, the TypeInformation will fall back to GenericType in the process function which result in bad performance when union another DataStream.
> In the union method of DataStream, the type is first checked for equality.
> Here is an example:
> {code:java}
> object Test {
>     def main(args: Array[String]): Unit = {
>       val env = StreamExecutionEnvironment.getExecutionEnvironment
>       val orderA: DataStream[Order] = env.fromCollection(Seq(
>         Order(1L, "beer", 3),
>          Order(1L, "diaper", 4),
>          Order(3L, "rubber", 2)))
>       val orderB: DataStream[Order] = env.fromCollection(Seq(
>         new Order(2L, "pen", 3),
>         new Order(2L, "rubber", 3),
>         new Order(4L, "beer", 1)))
>       val orderC: DataStream[Order] = orderA.keyBy(_.user)
>         .intervalJoin(orderB.keyBy(_.user))
>         .between(Time.seconds(0), Time.seconds(0))
>         .process(new ProcessJoinFunction[Order, Order, Order] {
>           override def processElement(left: Order, right: Order, ctx: ProcessJoinFunction[Order, Order, Order]#Context, out: Collector[Order]): Unit = {
>             out.collect(left)
>           }})
>       println("C: " + orderC.dataType.toString)
>       println("B: " + orderB.dataType.toString)
>       orderC.union(orderB).print()
>       env.execute()
>     }
>     case class Order(user: Long, product: String, amount: Int)
> }{code}
> Here is the Exception:
> {code:java}
> Exception in thread "main" java.lang.IllegalArgumentException: Cannot union streams of different types: GenericType<com.manbuyun.awesome.flink.Test.Order> and com.manbuyun.awesome.flink.Test$Order(user: Long, product: String, amount: Integer)
>  at org.apache.flink.streaming.api.datastream.DataStream.union(DataStream.java:219)
>  at org.apache.flink.streaming.api.scala.DataStream.union(DataStream.scala:357)
>  at com.manbuyun.awesome.flink.Test$.main(Test.scala:38)
>  at com.manbuyun.awesome.flink.Test.main(Test.scala){code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)