You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2019/01/07 09:38:01 UTC

[jira] [Updated] (FLINK-10845) Support DISTINCT aggregates for batch

     [ https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Timo Walther updated FLINK-10845:
---------------------------------
    Fix Version/s: 1.8.0

> Support DISTINCT aggregates for batch
> -------------------------------------
>
>                 Key: FLINK-10845
>                 URL: https://issues.apache.org/jira/browse/FLINK-10845
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: xueyu
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.8.0
>
>
> Currently, we support distinct aggregates for streaming. However, executing the same query on batch like the following test:
> {code}
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val sqlQuery =
>       "SELECT b, " +
>       "  SUM(DISTINCT (a / 3)), " +
>       "  COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
>       "  COUNT(DISTINCT c) " +
>       "FROM MyTable " +
>       "GROUP BY b"
>     val data = new mutable.MutableList[(Int, Long, String)]
>     data.+=((1, 1L, "Hi"))
>     data.+=((2, 2L, "Hello"))
>     data.+=((3, 2L, "Hello world"))
>     data.+=((4, 3L, "Hello world, how are you?"))
>     data.+=((5, 3L, "I am fine."))
>     data.+=((6, 3L, "Luke Skywalker"))
>     data.+=((7, 4L, "Comment#1"))
>     data.+=((8, 4L, "Comment#2"))
>     data.+=((9, 4L, "Comment#3"))
>     data.+=((10, 4L, "Comment#4"))
>     data.+=((11, 5L, "Comment#5"))
>     data.+=((12, 5L, "Comment#6"))
>     data.+=((13, 5L, "Comment#7"))
>     data.+=((14, 5L, "Comment#8"))
>     data.+=((15, 5L, "Comment#9"))
>     data.+=((16, 6L, "Comment#10"))
>     data.+=((17, 6L, "Comment#11"))
>     data.+=((18, 6L, "Comment#12"))
>     data.+=((19, 6L, "Comment#13"))
>     data.+=((20, 6L, "Comment#14"))
>     data.+=((21, 6L, "Comment#15"))
>     val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
>     tEnv.registerTable("MyTable", t)
>     tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
> {code}
> Fails with:
> {code}
> org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT DISTINCT FROM 
> If you think this function should be supported, you can create an issue and start a discussion for it.
> 	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
> 	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
> 	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
> 	at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
> 	at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
> 	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
> 	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
> 	at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
> 	at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50)
> 	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)