You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sunjincheng121 <gi...@git.apache.org> on 2018/01/04 15:29:36 UTC

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

GitHub user sunjincheng121 opened a pull request:

    https://github.com/apache/flink/pull/5241

    [FLINK-8325][table] Add COUNT(*),COUNT(1) supported

    *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.*
    
    *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.*
    
    ## What is the purpose of the change
    
    * This PR add COUNT(\*),COUNT(1) supported.
    
    ## Brief change log
    
      - Add a construct parameter for `CountAggFunction`
      - Using `aggregateCall.argList` as parameter to create CountAggFunction` instance 
     in AggregateUtil.`transformToAggregateFunctions` 
    
    
    ## Verifying this change
    
    This change added tests and can be verified as follows:
    
      - *Add `GroupWindowITCase` to test bounded and unbounded group agg.*
      - *Add `CountAggFunctionWithConstantTest` to test COUNT aggregate function with non-nullable parameter*
      - *Modify some batch test case for corrected test result.*
     
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): ( no)
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
      - The serializers: (no)
      - The runtime per-record code paths (performance sensitive): (no)
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
      - The S3 file system connector: (no)
    
    ## Documentation
    
      - Does this pull request introduce a new feature? (no)

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sunjincheng121/flink FLINK-8325-PR

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5241.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5241
    
----
commit 51621e29fa6cb0780716c884ab9b9f0c3f468b08
Author: 金竹 <ji...@...>
Date:   2018-01-03T15:13:49Z

    [FLINK-8325][table] Add COUNT(*),COUNT(1) supported

----


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160251387
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala ---
    @@ -33,7 +33,20 @@ class CountAccumulator extends JTuple1[Long] {
     /**
       * built-in count aggregate function
       */
    -class CountAggFunction extends AggregateFunction[JLong, CountAccumulator] {
    +class CountAggFunction
    +  extends AggregateFunction[JLong, CountAccumulator] {
    +
    +  // process argument is optimized by calcite.
    --- End diff --
    
    `calcite` -> `Calcite`


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160454995
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    Maybe I did not clearly express what I mean. :) 


---

[GitHub] flink issue #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supported

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5241
  
    Please rebase the PR on the master once #5320 was merged.
    I'll have a look at the changes after the rebase.
    
    Thank you, Fabian


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r162643699
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -829,7 +833,8 @@ object AggregateUtil {
           outputType: RelDataType,
           groupings: Array[Int]): (Option[DataSetPreAggFunction],
             Option[TypeInformation[Row]],
    -        RichGroupReduceFunction[Row, Row]) = {
    +        RichGroupReduceFunction[Row, Row],
    +        Option[MapPartitionFunction[Row, Row]]) = {
    --- End diff --
    
    Please remove this parameter and extend `DataSetFinalAggFunction` to also implement `RichMapPartitionFunction` such that it can be used as both (similar to `DataSetPreAggFunction`).


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160265977
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -145,6 +145,14 @@ class DataStreamOverAggregate(
           inputSchema.typeInfo,
           Some(constants))
     
    +    val constantsTypeInfo =
    +      Some(constants).map(_.map(generator.generateExpression(_))).getOrElse(Seq()).map(_.resultType)
    +  val aggInputTypeInfo = constantsTypeInfo.++:(inputSchema.fieldTypeInfos)
    +
    +    val aggregateInputType =
    +      cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
    --- End diff --
    
    It should be possible to create a `RelDataType` without going through code generation and `TypeInformation`. We have a `RelDataType` for the input row and `RelDataType`s for the `RexNode`s. 
    
    Check if we can use `FlinkTypeFactory.createStructType()` to create a `RelDataType`.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160251401
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala ---
    @@ -33,7 +33,20 @@ class CountAccumulator extends JTuple1[Long] {
     /**
       * built-in count aggregate function
       */
    -class CountAggFunction extends AggregateFunction[JLong, CountAccumulator] {
    +class CountAggFunction
    +  extends AggregateFunction[JLong, CountAccumulator] {
    +
    +  // process argument is optimized by calcite.
    +  // For instance count(42) or count(*) which will optimized to count().
    +  def accumulate(acc: CountAccumulator): Unit = {
    +    acc.f0 += 1L
    +  }
    +
    +  // process argument is optimized by calcite.
    --- End diff --
    
    `calcite` -> `Calcite`


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160414914
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala ---
    @@ -254,7 +254,7 @@ class JoinITCase(
         val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
         tEnv.registerTable("A", table)
     
    -    val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
    --- End diff --
    
    Yes, when using `COUNT(*) ` we must change result expect. 
    ``` 
     expected: [3,1,1,Hi, 3,2,2,Hello, 3,3,2,Hello world]
     received: [4,1,1,Hi, 4,2,2,Hello, 4,3,2,Hello world] 
    ```


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160426130
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala ---
    @@ -254,7 +254,7 @@ class JoinITCase(
         val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
         tEnv.registerTable("A", table)
     
    -    val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
    --- End diff --
    
    HI, @fhueske I do not think The PR breaks the correctness of the test :), because when we run this test case will match `DataSetAggregateWithNullValuesRule`. which will union a NULL  row to dataset.  so using `COUNT(a1)` result `3` and using `COUNT(*)` result `4`. I think this is batch bug.  The snip as follow:
    ![image](https://user-images.githubusercontent.com/22488084/34726497-250f4b30-f58f-11e7-92cf-595f9c9a64e0.png) 
    I think we can talk about this bug in FlINK-8355... What do you think?



---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160430205
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    `6` times true and `9` times false is my expect.. but because `DataSetAggregateWithNullValuesRule` we will get the `9` times `null`.  as follows:
    ```
     expected: [false, false, false, false, false, false, false, false, false, true, true, true, true, true, true]
     received: [null, null, null, null, null, null, null, null, null, true, true, true, true, true, true] 
    ```


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160271966
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1130,259 +1134,263 @@ object AggregateUtil {
         // create aggregate function instances by function type and aggregate field data type.
         aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
           val argList: util.List[Integer] = aggregateCall.getArgList
    -      if (argList.isEmpty) {
    -        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    -          aggFieldIndexes(index) = Array[Int](0)
    --- End diff --
    
    Couldn't just change this line to `aggFieldIndexes(index) = Array[Int](-1)` instead of touching each line of this method?


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160268601
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1130,259 +1134,263 @@ object AggregateUtil {
         // create aggregate function instances by function type and aggregate field data type.
         aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
           val argList: util.List[Integer] = aggregateCall.getArgList
    -      if (argList.isEmpty) {
    -        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    -          aggFieldIndexes(index) = Array[Int](0)
    -        } else {
    -          throw new TableException("Aggregate fields should not be empty.")
    +
    +      if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +        aggregates(index) = new CountAggFunction()
    +        if(argList.isEmpty) {
    +          aggFieldIndexes(index) = Array[Int](-1)
    +        }else{
    --- End diff --
    
    add spaces


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160416641
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    I think this test case is bug of batch, so we should open it after FLINK-8355. 
    You are right we not sure how to fix the FLINK-8355 current time, but we are sure add a `null` row is not correct(best) way to deal with `non-groupby` query.  But I think we should fix FLINK-8355 after this PR merged(when we know how to fix it). What do you think?


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160250423
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala ---
    @@ -121,7 +121,7 @@ class AggregationCodeGenerator(
     
         // get parameter lists for aggregation functions
         val parametersCode = aggFields.map { inFields =>
    -      val fields = inFields.map { f =>
    +      val fields =  inFields.filter(index => index > -1).map { f =>
    --- End diff --
    
    remove double space


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160442170
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala ---
    @@ -254,7 +254,7 @@ class JoinITCase(
         val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
         tEnv.registerTable("A", table)
     
    -    val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
    --- End diff --
    
    Sounds good.  


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160438113
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala ---
    @@ -254,7 +254,7 @@ class JoinITCase(
         val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
         tEnv.registerTable("A", table)
     
    -    val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
    --- End diff --
    
    The test was working correctly before. If you remove all your changes, the test will pass. It started to fail when you added the count aggregate that is not null-aware. It does not matter which rules are applied during optimization as long as the query computes for the given input data the correct result. The test is correct and is failing due to the changes of the PR. Hence, the PR breaks the correctness of some queries that were working before.
    
    IMO, we cannot merge it in the current state. We have to fix FLINK-8355 before we can merge this PR. I have an idea how to fix FLINK-8355 and will add that to the ticket.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160439631
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    The test was passing before because both `COUNT(*)` and `COUNT(a)` ignore the `NULL` before this PR. That's a bug.... please seee SQL 92 https://docs.microsoft.com/en-us/sql/t-sql/functions/count-transact-sql 
    ```
    COUNT(*) returns the number of rows in a specified table without getting rid of duplicates. It counts each row separately. This includes rows that contain null values.
    ```


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r162639253
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/CountAggFunction.scala ---
    @@ -33,7 +33,20 @@ class CountAccumulator extends JTuple1[Long] {
     /**
       * built-in count aggregate function
       */
    -class CountAggFunction extends AggregateFunction[JLong, CountAccumulator] {
    +class CountAggFunction
    +  extends AggregateFunction[JLong, CountAccumulator] {
    +
    +  // process argument is optimized by Calcite.
    +  // For instance count(42) or count(*) which will optimized to count().
    --- End diff --
    
    `For instance count(42) or count(*) which will optimized to count().` -> 
    `For instance count(42) or count(*) will be optimized to count().`


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160257426
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala ---
    @@ -254,7 +254,7 @@ class JoinITCase(
         val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
         tEnv.registerTable("A", table)
     
    -    val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
    --- End diff --
    
    Why are you changing the query? Is it not working anymore?


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160268630
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1130,259 +1134,263 @@ object AggregateUtil {
         // create aggregate function instances by function type and aggregate field data type.
         aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
           val argList: util.List[Integer] = aggregateCall.getArgList
    -      if (argList.isEmpty) {
    -        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    -          aggFieldIndexes(index) = Array[Int](0)
    -        } else {
    -          throw new TableException("Aggregate fields should not be empty.")
    +
    +      if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +        aggregates(index) = new CountAggFunction()
    +        if(argList.isEmpty) {
    --- End diff --
    
    add space


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160428673
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1130,259 +1134,263 @@ object AggregateUtil {
         // create aggregate function instances by function type and aggregate field data type.
         aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
           val argList: util.List[Integer] = aggregateCall.getArgList
    -      if (argList.isEmpty) {
    -        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    -          aggFieldIndexes(index) = Array[Int](0)
    --- End diff --
    
    I think it should work, but I do not want deal with `SqlCountAggFunction` twice, so i remove the `SqlCountAggFunction` from case condition. 


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160622372
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    No worries. I think if we fix FLINK-8355 first, the test cases should work correctly. I've added a comment to FLINK-8355 with an idea how to fix it.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160260046
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/GroupWindowITCase.scala ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.runtime.stream.sql.GroupWindowITCase.TimestampAndWatermarkWithOffset
    +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class GroupWindowITCase extends StreamingWithStateTestBase {
    --- End diff --
    
    Add these tests to `SqlITCase` instead of creating a new class. The new test class will result in a new mini cluster to be setup which is quite expensive


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160267929
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -145,6 +145,14 @@ class DataStreamOverAggregate(
           inputSchema.typeInfo,
           Some(constants))
     
    +    val constantsTypeInfo =
    +      Some(constants).map(_.map(generator.generateExpression(_))).getOrElse(Seq()).map(_.resultType)
    +  val aggInputTypeInfo = constantsTypeInfo.++:(inputSchema.fieldTypeInfos)
    +
    +    val aggregateInputType =
    +      cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
    --- End diff --
    
    Something like:
    
    ```
    val constantTypes = constants.map(_.getType)
    val fieldTypes = input.getRowType.getFieldList.asScala.map(_.getType)
    val aggInTypes = constantTypes ++ fieldTypes
    val aggInNames = aggInTypes.indices.map("f" + _)
    
    val aggInRowType = getCluster.getTypeFactory.createStructType(aggInTypes.asJava, aggInNames.asJava)
    ```



---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160250581
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala ---
    @@ -139,7 +139,7 @@ class AggregationCodeGenerator(
         val classes = UserDefinedFunctionUtils.typeInfoToClass(physicalInputTypes)
         val constantClasses = UserDefinedFunctionUtils.typeInfoToClass(constantTypes)
         val methodSignaturesList = aggFields.map { inFields =>
    -      inFields.map { f =>
    +      inFields.filter(index => index > -1).map { f =>
    --- End diff --
    
    `filter(index => index > -1)` can be shortend to `filter(_ > -1)`


---

[GitHub] flink issue #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supported

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5241
  
    Thanks for the update and rebasing @sunjincheng121.
    The PR looks good. Will run final tests and merge it.
    
    Best, Fabian


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160253454
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala ---
    @@ -359,12 +359,17 @@ class AggregationCodeGenerator(
     
           val accumulate: String = {
             for (i <- aggs.indices) yield {
    -          j"""
    -             |    ${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i);
    -             |    ${genDataViewFieldSetter(s"acc$i", i)}
    -             |    ${aggs(i)}.accumulate(
    -             |      acc$i,
    -             |      ${parametersCode(i)});""".stripMargin
    --- End diff --
    
    Change to 
    
    ```
     |    ${aggs(i)}.accumulate(
     |      acc$i
     |        ${if (!parametersCode(i).isEmpty) ","}
     |      ${parametersCode(i)}
     |    );""".stripMargin
    ```
      


---

[GitHub] flink issue #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supported

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/5241
  
    @fhueske Thanks for your review and suggestion about FLINK-8355, I have fixed the FLINK-8355 issue in this PR. I appreciate if you can review the change again.
    
    Best, Jincheng


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160250511
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala ---
    @@ -121,7 +121,7 @@ class AggregationCodeGenerator(
     
         // get parameter lists for aggregation functions
         val parametersCode = aggFields.map { inFields =>
    -      val fields = inFields.map { f =>
    +      val fields =  inFields.filter(index => index > -1).map { f =>
    --- End diff --
    
    `filter(index => index > -1)` can be shortend to `filter(_ > -1)`
      


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160260174
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/GroupWindowITCase.scala ---
    @@ -0,0 +1,122 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.stream.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.streaming.api.watermark.Watermark
    +import org.apache.flink.table.api.TableEnvironment
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.runtime.stream.sql.GroupWindowITCase.TimestampAndWatermarkWithOffset
    +import org.apache.flink.table.runtime.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit._
    +
    +import scala.collection.mutable
    +
    +class GroupWindowITCase extends StreamingWithStateTestBase {
    +
    +  val data = List(
    +    (1000L, "1", "Hello"),
    +    (2000L, "2", "Hello"),
    +    (3000L, null.asInstanceOf[String], "Hello"),
    +    (4000L, "4", "Hello"),
    +    (5000L, null.asInstanceOf[String], "Hello"),
    +    (6000L, "6", "Hello"),
    +    (7000L, "7", "Hello World"),
    +    (8000L, "8", "Hello World"),
    +    (20000L, "20", "Hello World"))
    +
    +  @Test
    +  def testRowTimeTumbleWindow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    StreamITCase.clear
    +    env.setParallelism(1)
    +
    +    val stream = env
    +      .fromCollection(data)
    +      .assignTimestampsAndWatermarks(
    +        new TimestampAndWatermarkWithOffset[(Long, String, String)](0L))
    +    val table = stream.toTable(tEnv, 'a, 'b, 'c, 'rowtime.rowtime)
    +
    +    tEnv.registerTable("T1", table)
    +
    +    val sqlQuery = "SELECT c, COUNT(*), COUNT(1), COUNT(b) FROM T1 " +
    +      "GROUP BY TUMBLE(rowtime, interval '5' SECOND), c"
    +
    +    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
    +    result.addSink(new StreamITCase.StringSink[Row])
    +    env.execute()
    +
    +    val expected = List("Hello World,2,2,2", "Hello World,1,1,1", "Hello,4,4,3", "Hello,2,2,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  @Test
    +  def testUnboundedGroupWindow(): Unit = {
    --- End diff --
    
    Rename to `testNonWindowedCount()`


---

[GitHub] flink issue #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supported

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/5241
  
    Thanks @fhueske,  I had rebase the code. I appreciate if you can review the changes.
    
    Best, Jincheng


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160442930
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    It is correct, that the current state does not work for all queries and input data. There is a bug that needs to be fixed. However, it is working correctly for the *given* test.
    
    The test is correct and is now failing due to this PR. Hence it causes a regression. Your changes fix a bug but introduce another one.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160253551
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala ---
    @@ -383,12 +388,16 @@ class AggregationCodeGenerator(
     
           val retract: String = {
             for (i <- aggs.indices) yield {
    -          j"""
    -             |    ${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i);
    -             |    ${genDataViewFieldSetter(s"acc$i", i)}
    -             |    ${aggs(i)}.retract(
    -             |      acc$i,
    -             |      ${parametersCode(i)});""".stripMargin
    --- End diff --
    
    same as for accumulate


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160255883
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AggFunctionTestBase.scala ---
    @@ -146,14 +146,31 @@ abstract class AggFunctionTestBase[T, ACC] {
         val accumulator = aggregator.createAccumulator()
         vals.foreach(
           v =>
    -        accumulateFunc.invoke(aggregator, accumulator.asInstanceOf[Object], v.asInstanceOf[Object])
    +        if(accumulateFunc.getParameterCount == 1){
    +          this.accumulateFunc.invoke(aggregator, accumulator.asInstanceOf[Object])
    +        }else{
    --- End diff --
    
    add spaces `} else {`


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160420917
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/JoinITCase.scala ---
    @@ -254,7 +254,7 @@ class JoinITCase(
         val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2, 'a3)
         tEnv.registerTable("A", table)
     
    -    val sqlQuery2 = "SELECT * FROM (SELECT count(*) FROM A) CROSS JOIN A"
    --- End diff --
    
    The current result is correct. `CollectionDataSets.getSmall3TupleDataSet` has three rows. Hence `SELECT count(a1) FROM A` must be `3` and not `4`. 
    The PR breaks the correctness of the test.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5241


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160437205
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    The rule or anything else that happens during optimization does not matter. The only thing that counts are input data, query, and output data. Given the query and the input data, the current result is correct. 
    
    The test was passing before. If it fails now, it is clear that the PR broke it.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160413762
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala ---
    @@ -359,12 +359,17 @@ class AggregationCodeGenerator(
     
           val accumulate: String = {
             for (i <- aggs.indices) yield {
    -          j"""
    -             |    ${accTypes(i)} acc$i = (${accTypes(i)}) accs.getField($i);
    -             |    ${genDataViewFieldSetter(s"acc$i", i)}
    -             |    ${aggs(i)}.accumulate(
    -             |      acc$i,
    -             |      ${parametersCode(i)});""".stripMargin
    --- End diff --
    
    The code is very simple. bug when `!parametersCode(i).isEmpty`, will generate the following code: 
    `.accumulate(acc0());` which we do not wanted.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r162644529
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/DataSetEmptyProcessMapPartition.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.runtime.aggregate
    +
    +import java.lang
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction
    +import org.apache.flink.configuration.Configuration
    +import org.apache.flink.table.codegen.{Compiler, GeneratedAggregationsFunction}
    +import org.apache.flink.table.util.Logging
    +import org.apache.flink.types.Row
    +import org.apache.flink.util.Collector
    +
    +/**
    +  * Set default value of all aggregates for non-grouped aggregates on empty dataset.
    +  */
    +class DataSetEmptyProcessMapPartition(genAggregations: GeneratedAggregationsFunction)
    --- End diff --
    
    Can be removed if we extend `DataSetFinalAggFunction` to also implement `MapPartitionFunction`


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160413930
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/datastream/DataStreamOverAggregate.scala ---
    @@ -145,6 +145,14 @@ class DataStreamOverAggregate(
           inputSchema.typeInfo,
           Some(constants))
     
    +    val constantsTypeInfo =
    +      Some(constants).map(_.map(generator.generateExpression(_))).getOrElse(Seq()).map(_.resultType)
    +  val aggInputTypeInfo = constantsTypeInfo.++:(inputSchema.fieldTypeInfos)
    +
    +    val aggregateInputType =
    +      cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]
    --- End diff --
    
    +1


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r162643053
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala ---
    @@ -164,7 +164,6 @@ object FlinkRuleSets {
         // translate to Flink DataSet nodes
         DataSetWindowAggregateRule.INSTANCE,
         DataSetAggregateRule.INSTANCE,
    -    DataSetAggregateWithNullValuesRule.INSTANCE,
    --- End diff --
    
    Please move the changes to remove this rule into a separate PR.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r162645683
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala ---
    @@ -157,6 +158,7 @@ class DataSetAggregate(
           } else {
             inputDS
               .reduceGroup(finalAgg)
    +          .mapPartition(emptyProcessMapPartition.get)
    --- End diff --
    
    We should implement the pre-aggregated non-grouped agg also with `mapPartition` then, IMO.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160419117
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    A workaround way is we create a `transformToBatchAggregateFunctions` or add a parameter to `transformToAggregateFunctions` which can identify batch or stream.
    but not elegant :(.


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160259534
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    This PR introduces a regression. Previously working queries will compute incorrect results when we merge it. 
    
    IMO, we should only merge it if all existing tests pass. The problem is that FLINK-8355 cannot be easily fixed. We need to add a NULL tuple, because Flink's DataSet API won't compute any result if the input is empty. Not sure how this can be resolved.
      


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r162649999
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1130,259 +1137,263 @@ object AggregateUtil {
         // create aggregate function instances by function type and aggregate field data type.
         aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
           val argList: util.List[Integer] = aggregateCall.getArgList
    -      if (argList.isEmpty) {
    -        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    -          aggFieldIndexes(index) = Array[Int](0)
    -        } else {
    -          throw new TableException("Aggregate fields should not be empty.")
    +
    +      if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +        aggregates(index) = new CountAggFunction
    +        if(argList.isEmpty) {
    --- End diff --
    
    +space


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160255960
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/aggfunctions/AggFunctionTestBase.scala ---
    @@ -146,14 +146,31 @@ abstract class AggFunctionTestBase[T, ACC] {
         val accumulator = aggregator.createAccumulator()
         vals.foreach(
           v =>
    -        accumulateFunc.invoke(aggregator, accumulator.asInstanceOf[Object], v.asInstanceOf[Object])
    +        if(accumulateFunc.getParameterCount == 1){
    --- End diff --
    
    add spaces `if (accumulateFunc.getParameterCount == 1) {`


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r160421752
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/batch/sql/SetOperatorsITCase.scala ---
    @@ -278,7 +278,14 @@ class SetOperatorsITCase(
         TestBaseUtils.compareResultAsText(results.asJava, expected)
       }
     
    +  /**
    +    * This test will checks IN for NULLs based on whether COUNT (*) and COUNT (a) are equal. Due to
    +    * [[org.apache.flink.table.plan.rules.dataSet.DataSetAggregateWithNullValuesRule]] will
    +    * union a NULL row in to input DataSet for non-groupBy agg. That caused COUNT (*) and COUNT(a)
    +    * are not equal. So this test case ignored before FLINK-8355 be fixed.
    --- End diff --
    
    I had a look at the test and think that it is correct.  `SELECT a FROM Table3` returns `1, 2, 3` and `SELECT d FROM Table5` returns `1, 2, 2, 3, 3, 3, 4, 4, 4, 4, 5, 5, 5, 5, 5`. So the result of the query should be 6 times `true` and 9 times `false`.
    
    Which result woud you expect?


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r162649590
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/AggregateUtil.scala ---
    @@ -1130,259 +1137,263 @@ object AggregateUtil {
         // create aggregate function instances by function type and aggregate field data type.
         aggregateCalls.zipWithIndex.foreach { case (aggregateCall, index) =>
           val argList: util.List[Integer] = aggregateCall.getArgList
    -      if (argList.isEmpty) {
    -        if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    -          aggFieldIndexes(index) = Array[Int](0)
    -        } else {
    -          throw new TableException("Aggregate fields should not be empty.")
    +
    +      if (aggregateCall.getAggregation.isInstanceOf[SqlCountAggFunction]) {
    +        aggregates(index) = new CountAggFunction
    +        if(argList.isEmpty) {
    +          aggFieldIndexes(index) = Array[Int](-1)
    +        }else{
    --- End diff --
    
    +spaces


---

[GitHub] flink pull request #5241: [FLINK-8325][table] Add COUNT(*),COUNT(1) supporte...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5241#discussion_r162642637
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetAggregate.scala ---
    @@ -157,6 +158,7 @@ class DataSetAggregate(
           } else {
             inputDS
               .reduceGroup(finalAgg)
    +          .mapPartition(emptyProcessMapPartition.get)
    --- End diff --
    
    I thought about this again. 
    
    I think we should extend `DataSetFinalAggFunction` to also implement `MapPartitionFunction` similar to the `DataSetPreAggFunction`. The `GroupReduceFunction.reduceGroup()` method and the `MapPartitionFunction.mapPartition()` function share the same code. For the grouped aggregation, we use `reduceGroup(function)` and for the non-grouped aggregation we use `mapPartition(function).setParallelism(1)`. Setting the parallelism is important here.
    
    That way we avoid an additional function and its a cleaner design.


---