You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2019/01/07 09:38:01 UTC
[jira] [Closed] (FLINK-10845) Support DISTINCT aggregates for batch
[ https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Timo Walther closed FLINK-10845.
--------------------------------
Resolution: Fixed
> Support DISTINCT aggregates for batch
> -------------------------------------
>
> Key: FLINK-10845
> URL: https://issues.apache.org/jira/browse/FLINK-10845
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Timo Walther
> Assignee: xueyu
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.8.0
>
>
> Currently, we support distinct aggregates for streaming. However, executing the same query on batch like the following test:
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env)
> val sqlQuery =
> "SELECT b, " +
> " SUM(DISTINCT (a / 3)), " +
> " COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
> " COUNT(DISTINCT c) " +
> "FROM MyTable " +
> "GROUP BY b"
> val data = new mutable.MutableList[(Int, Long, String)]
> data.+=((1, 1L, "Hi"))
> data.+=((2, 2L, "Hello"))
> data.+=((3, 2L, "Hello world"))
> data.+=((4, 3L, "Hello world, how are you?"))
> data.+=((5, 3L, "I am fine."))
> data.+=((6, 3L, "Luke Skywalker"))
> data.+=((7, 4L, "Comment#1"))
> data.+=((8, 4L, "Comment#2"))
> data.+=((9, 4L, "Comment#3"))
> data.+=((10, 4L, "Comment#4"))
> data.+=((11, 5L, "Comment#5"))
> data.+=((12, 5L, "Comment#6"))
> data.+=((13, 5L, "Comment#7"))
> data.+=((14, 5L, "Comment#8"))
> data.+=((15, 5L, "Comment#9"))
> data.+=((16, 6L, "Comment#10"))
> data.+=((17, 6L, "Comment#11"))
> data.+=((18, 6L, "Comment#12"))
> data.+=((19, 6L, "Comment#13"))
> data.+=((20, 6L, "Comment#14"))
> data.+=((21, 6L, "Comment#15"))
> val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
> tEnv.registerTable("MyTable", t)
> tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
> {code}
> Fails with:
> {code}
> org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT DISTINCT FROM
> If you think this function should be supported, you can create an issue and start a discussion for it.
> at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
> at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
> at scala.Option.getOrElse(Option.scala:121)
> at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
> at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
> at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
> at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221)
> at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170)
> at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
> at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
> at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
> at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
> at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
> at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
> at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50)
> at org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
> {code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)