You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/11/11 15:21:00 UTC

[jira] [Commented] (FLINK-10845) Support DISTINCT aggregates for batch

    [ https://issues.apache.org/jira/browse/FLINK-10845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16682906#comment-16682906 ] 

ASF GitHub Bot commented on FLINK-10845:
----------------------------------------

xueyumusic opened a new pull request #7079: [FLINK-10845][table] Support multiple different DISTINCT aggregates for batch
URL: https://github.com/apache/flink/pull/7079
 
 
   ## What is the purpose of the change
   
   This PR tried to support multiple distinct aggregates with different arguments for batch. For other cases such as single distinct or multiple with the same arguments it seemed to have supported. Actually it is the problem of IS_NOT_DISTINCT_FROM codegen support result from calcite rewrite
   
   ## Brief change log
   
     - *CodeGenerator and ScalarOperator*
   
   ## Verifying this change
   
   This change added tests and can be verified as follows:
   
     - *Added AggregateITCase batch test*
   
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): (no)
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
     - The serializers: (no)
     - The runtime per-record code paths (performance sensitive): (no)
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
     - The S3 file system connector: (no)
   
   ## Documentation
   
     - Does this pull request introduce a new feature? (no)
     - If yes, how is the feature documented? (not documented)
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Support DISTINCT aggregates for batch
> -------------------------------------
>
>                 Key: FLINK-10845
>                 URL: https://issues.apache.org/jira/browse/FLINK-10845
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API &amp; SQL
>            Reporter: Timo Walther
>            Assignee: xueyu
>            Priority: Major
>              Labels: pull-request-available
>
> Currently, we support distinct aggregates for streaming. However, executing the same query on batch like the following test:
> {code}
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     val tEnv = TableEnvironment.getTableEnvironment(env)
>     val sqlQuery =
>       "SELECT b, " +
>       "  SUM(DISTINCT (a / 3)), " +
>       "  COUNT(DISTINCT SUBSTRING(c FROM 1 FOR 2))," +
>       "  COUNT(DISTINCT c) " +
>       "FROM MyTable " +
>       "GROUP BY b"
>     val data = new mutable.MutableList[(Int, Long, String)]
>     data.+=((1, 1L, "Hi"))
>     data.+=((2, 2L, "Hello"))
>     data.+=((3, 2L, "Hello world"))
>     data.+=((4, 3L, "Hello world, how are you?"))
>     data.+=((5, 3L, "I am fine."))
>     data.+=((6, 3L, "Luke Skywalker"))
>     data.+=((7, 4L, "Comment#1"))
>     data.+=((8, 4L, "Comment#2"))
>     data.+=((9, 4L, "Comment#3"))
>     data.+=((10, 4L, "Comment#4"))
>     data.+=((11, 5L, "Comment#5"))
>     data.+=((12, 5L, "Comment#6"))
>     data.+=((13, 5L, "Comment#7"))
>     data.+=((14, 5L, "Comment#8"))
>     data.+=((15, 5L, "Comment#9"))
>     data.+=((16, 6L, "Comment#10"))
>     data.+=((17, 6L, "Comment#11"))
>     data.+=((18, 6L, "Comment#12"))
>     data.+=((19, 6L, "Comment#13"))
>     data.+=((20, 6L, "Comment#14"))
>     data.+=((21, 6L, "Comment#15"))
>     val t = env.fromCollection(data).toTable(tEnv).as('a, 'b, 'c)
>     tEnv.registerTable("MyTable", t)
>     tEnv.sqlQuery(sqlQuery).toDataSet[Row].print()
> {code}
> Fails with:
> {code}
> org.apache.flink.table.codegen.CodeGenException: Unsupported call: IS NOT DISTINCT FROM 
> If you think this function should be supported, you can create an issue and start a discussion for it.
> 	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
> 	at org.apache.flink.table.codegen.CodeGenerator$$anonfun$visitCall$3.apply(CodeGenerator.scala:1027)
> 	at scala.Option.getOrElse(Option.scala:121)
> 	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:1027)
> 	at org.apache.flink.table.codegen.CodeGenerator.visitCall(CodeGenerator.scala:66)
> 	at org.apache.calcite.rex.RexCall.accept(RexCall.java:107)
> 	at org.apache.flink.table.codegen.CodeGenerator.generateExpression(CodeGenerator.scala:247)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.addInnerJoin(DataSetJoin.scala:221)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:170)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetJoin.translateToPlan(DataSetJoin.scala:165)
> 	at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:91)
> 	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:498)
> 	at org.apache.flink.table.api.BatchTableEnvironment.translate(BatchTableEnvironment.scala:476)
> 	at org.apache.flink.table.api.scala.BatchTableEnvironment.toDataSet(BatchTableEnvironment.scala:141)
> 	at org.apache.flink.table.api.scala.TableConversions.toDataSet(TableConversions.scala:50)
> 	at org.apache.flink.table.runtime.stream.sql.SqlITCase.testDistinctGroupBy(SqlITCase.scala:2
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)