You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by "Timo Walther (JIRA)" <ji...@apache.org> on 2018/11/09 16:14:00 UTC
[jira] [Created] (FLINK-10845) Support DISTINCT aggregates for
batch
Timo Walther created FLINK-10845:
------------------------------------
Summary: Support DISTINCT aggregates for batch
Key: FLINK-10845
URL: https://issues.apache.org/jira/browse/FLINK-10845
Project: Flink
Issue Type: New Feature
Components: Table API & SQL
Reporter: Timo Walther
Currently, we support distinct aggregates for streaming. However, executing the same query on batch like the following test:
{code}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env)
val sqlQuery =
"SELECT b, " +
" SUM(DISTINCT (a / 3)), " +
" COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
" COUNT(DISTINCT c) " +
"FROM MyTable " +
"GROUP BY b"
val data = new mutable.MutableList[(Int, Long, String)]
data.+=((1, 1L, "Hi"))
data.+=((2, 2L, "Hello"))
data.+=((3, 2L, "Hello world"))
data.+=((4, 3L, "Hello world, how are you?"))
data.+=((5, 3L, "I am fine."))
data.+=((6, 3L, "Luke Skywalker"))
data.+=((7, 4L, "Comment#1"))
data.+=((8, 4L, "Comment#2"))
data.+=((9, 4L, "Comment#3"))
data.+=((10, 4L, "Comment#4"))
data.+=((11, 5L, "Comment#5"))
data.+=((12, 5L, "Comment#6"))
data.+=((13, 5L, "Comment#7"))
data.+=((14, 5L, "Comment#8"))
data.+=((15, 5L, "Comment#9"))
data.+=((16, 6L, "Comment#10"))
data.+=((17, 6L, "Comment#11"))
data.+=((18, 6L, "Comment#12"))
data.+=((19, 6L, "Comment#13"))
data.+=((20, 6L, "Comment#14"))
data.+=((21, 6L, "Comment#15"))
val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
tEnv.registerTable("MyTable", t)
tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
{code}
Fails with:
{code}
org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT DISTINCT FROM
If you think this function should be supported, you can create an issue and start a discussion for it.
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170)
at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50)
at org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)