You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by sunjincheng121 <gi...@git.apache.org> on 2017/05/09 09:27:23 UTC

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

GitHub user sunjincheng121 opened a pull request:

    https://github.com/apache/flink/pull/3851

    [FLINK-6462] [table] Add requiresOver interface for AggregateFunction

    Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
    If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html).
    In addition to going through the list, please provide a meaningful description of your changes.
    
    - [x] General
      - The pull request references the related JIRA issue ("[FLINK-6462] [table] Add requiresOver interface for AggregateFunction ")
      - The pull request addresses only one issue
      - Each commit in the PR has a meaningful commit message (including the JIRA id)
    
    - [ ] Documentation
      - Documentation has been added for new functionality
      - Old documentation affected by the pull request has been updated
      - JavaDoc for public methods has been added
    
    - [x] Tests & Build
      - Functionality added by the pull request is covered by tests
      - `mvn clean verify` has been executed successfully locally or a Travis build has passed


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/sunjincheng121/flink FLINK-6462-PR

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/3851.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3851
    
----
commit 9a474ebccd13f5a394c77a7e4d701fc16a3772f2
Author: sunjincheng121 <su...@gmail.com>
Date:   2017-05-08T04:04:47Z

    [FLINK-6462] [table] Add requiresOver interface for AggregateFunction

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116217338
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/AggregationsTest.scala ---
    @@ -0,0 +1,38 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.api.scala.batch.table
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.api.TableException
    +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
    +import org.apache.flink.table.utils.TableTestBase
    +import org.junit.Test
    +
    +class AggregationsTest extends TableTestBase {
    +
    +  @Test(expected = classOf[TableException])
    --- End diff --
    
    can be moved to `AggregationsValidationTest`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116211558
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala ---
    @@ -53,8 +54,17 @@ class DataSetAggregateRule
     
         // check if we have distinct aggregates
         val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    +    if (distinctAggs) {
    +      throw TableException("DISTINCT aggregates are currently not supported.")
    +    }
    +
    +    // check if we have over aggregates
    +    val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
    +    if (overAggs) {
    +      throw TableException("OVER clause is necessary for requires over window functions")
    --- End diff --
    
    Same for the other checks


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116211361
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala ---
    @@ -53,8 +54,17 @@ class DataSetAggregateRule
     
         // check if we have distinct aggregates
         val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    +    if (distinctAggs) {
    +      throw TableException("DISTINCT aggregates are currently not supported.")
    +    }
    +
    +    // check if we have over aggregates
    +    val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
    +    if (overAggs) {
    +      throw TableException("OVER clause is necessary for requires over window functions")
    --- End diff --
    
    The error message should list the aggregation functions that require OVER


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116592490
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -215,14 +215,22 @@ case class Aggregate(
       }
     
       override def validate(tableEnv: TableEnvironment): LogicalNode = {
    -
    +    implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
         val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
         val groupingExprs = resolvedAggregate.groupingExpressions
         val aggregateExprs = resolvedAggregate.aggregateExpressions
         aggregateExprs.foreach(validateAggregateExpression)
         groupingExprs.foreach(validateGroupingExpression)
     
         def validateAggregateExpression(expr: Expression): Unit = expr match {
    +      case Alias(child, _, _) => validateAggregateExpression(child)
    --- End diff --
    
    rename `child` as `Aggregate` has a member `child` which might be confusing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116592552
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -584,13 +592,22 @@ case class WindowAggregate(
       }
     
       override def validate(tableEnv: TableEnvironment): LogicalNode = {
    +    implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
         val resolvedWindowAggregate = super.validate(tableEnv).asInstanceOf[WindowAggregate]
         val groupingExprs = resolvedWindowAggregate.groupingExpressions
         val aggregateExprs = resolvedWindowAggregate.aggregateExpressions
         aggregateExprs.foreach(validateAggregateExpression)
         groupingExprs.foreach(validateGroupingExpression)
     
         def validateAggregateExpression(expr: Expression): Unit = expr match {
    +      case Alias(child, _, _) => validateAggregateExpression(child)
    --- End diff --
    
    Same comments as for `Aggregate` apply here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116218286
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/AggregationsTest.scala ---
    @@ -0,0 +1,41 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.table.api.scala.batch.sql
    +
    +import org.apache.flink.api.scala._
    +import org.apache.flink.table.api.ValidationException
    +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.OverAgg0
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.table.utils.TableTestBase
    +import org.junit.Test
    +
    +class AggregationsTest extends TableTestBase {
    +
    +  @Test(expected = classOf[ValidationException])
    --- End diff --
    
    Please add a brief comment to each test why it is expected to fail.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116211438
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala ---
    @@ -53,8 +54,17 @@ class DataSetAggregateRule
     
         // check if we have distinct aggregates
         val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    +    if (distinctAggs) {
    +      throw TableException("DISTINCT aggregates are currently not supported.")
    +    }
    +
    +    // check if we have over aggregates
    +    val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
    --- End diff --
    
    use `filter` instead of `exists` to extract the functions that require OVER.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116592899
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -215,14 +215,22 @@ case class Aggregate(
       }
     
       override def validate(tableEnv: TableEnvironment): LogicalNode = {
    -
    +    implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
         val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
         val groupingExprs = resolvedAggregate.groupingExpressions
         val aggregateExprs = resolvedAggregate.aggregateExpressions
         aggregateExprs.foreach(validateAggregateExpression)
         groupingExprs.foreach(validateGroupingExpression)
     
         def validateAggregateExpression(expr: Expression): Unit = expr match {
    +      case Alias(child, _, _) => validateAggregateExpression(child)
    --- End diff --
    
    Isn't this case caught by the last case `case e => e.children.foreach(validateAggregateExpression)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116212331
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupWindowAggregateRule.scala ---
    @@ -44,14 +44,19 @@ class DataStreamGroupWindowAggregateRule
         if (distinctAggs) {
           throw TableException("DISTINCT aggregates are currently not supported.")
         }
    +    // check if we have over aggregates
    +    val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
    +    if (overAggs) {
    +      throw TableException("OVER clause is necessary for requires over window functions")
    +    }
     
         // check if we have grouping sets
         val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
         if (groupSets || agg.indicator) {
           throw TableException("GROUPING SETS are currently not supported.")
         }
     
    -    !distinctAggs && !groupSets && !agg.indicator
    --- End diff --
    
    This check must not be removed.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116593574
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/logical/operators.scala ---
    @@ -215,14 +215,22 @@ case class Aggregate(
       }
     
       override def validate(tableEnv: TableEnvironment): LogicalNode = {
    -
    +    implicit val relBuilder: RelBuilder = tableEnv.getRelBuilder
         val resolvedAggregate = super.validate(tableEnv).asInstanceOf[Aggregate]
         val groupingExprs = resolvedAggregate.groupingExpressions
         val aggregateExprs = resolvedAggregate.aggregateExpressions
         aggregateExprs.foreach(validateAggregateExpression)
         groupingExprs.foreach(validateGroupingExpression)
     
         def validateAggregateExpression(expr: Expression): Unit = expr match {
    +      case Alias(child, _, _) => validateAggregateExpression(child)
    +        // check user-defined aggregate function
    +      case AggFunctionCall(agg, _) if agg.requiresOver =>
    --- End diff --
    
    Check this case also as 
    ```
    case aggExpr: Aggregation if aggExpr.getSqlAggFunction.requiresOver =>
    ```
    
    in case we add built-in Aggregations that require Over.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3851: [FLINK-6462] [table] Add requiresOver interface for Aggre...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3851
  
    Hi @fhueske, Thanks for the reviewing. 
    I forgot to run the `mvn verify` at the first time. Sorry for that.:(
    I had update the PR. according your comments. Except one,i.e.:
    Move `scala/batch/table/AggregationsTest.scala#testOverAggregation` to `AggregationsValidationTest`. Because `AggregationsTest` should extends `TableTestBase`. And `AggregationsValidationTest` need not.
    What do you think?
    
    Thanks,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116215873
  
    --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/api/java/utils/UserDefinedAggFunctions.java ---
    @@ -23,13 +23,37 @@
     import java.util.Iterator;
     
     public class UserDefinedAggFunctions {
    +    // Accumulator for test requiresOver
    +    public static class Accumulator0 extends Tuple2<Long, Integer>{}
     
         // Accumulator for WeightedAvg
         public static class WeightedAvgAccum extends Tuple2<Long, Integer> {
             public long sum = 0;
             public int count = 0;
         }
     
    +    // Test for requiresOver
    +    public static class OverAgg0 extends AggregateFunction<Long, Accumulator0> {
    --- End diff --
    
    please reorder classes that the `AggregateFunction` class follows the accumulator class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116211695
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetWindowAggregateRule.scala ---
    @@ -49,7 +49,13 @@ class DataSetWindowAggregateRule
           throw TableException("GROUPING SETS are currently not supported.")
         }
     
    -    !distinctAggs && !groupSets && !agg.indicator
    --- End diff --
    
    This check must not be removed. Instead we should return with `false` from `matches`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116212002
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala ---
    @@ -53,8 +54,17 @@ class DataSetAggregateRule
     
         // check if we have distinct aggregates
         val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    +    if (distinctAggs) {
    +      throw TableException("DISTINCT aggregates are currently not supported.")
    --- End diff --
    
    We should not throw an exception here, but return with `false` from the `matches()` call.
    Otherwise the optimization terminates, although the query could be rewritten without distinct aggregates.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/3851


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3851: [FLINK-6462] [table] Add requiresOver interface for Aggre...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3851
  
    Thanks for the reviewing. @fhueske Move the check logic from `rule` into `Table API validation` phase make sense to me. That is `Table API validation` ensure the `Semantics` and `rule` ensure the `Runtime` can deal with the situation. The PR. is updated. :)
    
    Thanks,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116595353
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---
    @@ -31,6 +31,17 @@ class WindowAggregateTest extends TableTestBase {
       streamUtil.addTable[(Int, String, Long)](
         "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
     
    +  /**
    +    * OVER clause is necessary for [[OverAgg0]] window function.
    +    */
    +  @Test(expected = classOf[ValidationException])
    +  def testOverAggregation() :Unit = {
    --- End diff --
    
    `testOverAggregation(): Unit = {`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116442782
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateRule.scala ---
    @@ -50,6 +51,13 @@ class DataSetAggregateRule
           agg.getGroupSets.size() == 1) {
           return false
         }
    +    // check if we have over aggregates
    +    val overAggNames =
    --- End diff --
    
    I thought about this again. 
    The optimization rules are not the right place for this check and should not be modified. 
    
    Instead the check should be moved to the Table API validation phase. 
    SQL queries are automatically checked by Calcite's `SqlValidator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116211870
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/dataSet/DataSetAggregateWithNullValuesRule.scala ---
    @@ -51,8 +52,17 @@ class DataSetAggregateWithNullValuesRule
     
         // check if we have distinct aggregates
         val distinctAggs = agg.getAggCallList.exists(_.isDistinct)
    +    if (distinctAggs) {
    +      throw TableException("DISTINCT aggregates are currently not supported.")
    --- End diff --
    
    We should not throw an exception here, but return with `false` from the `matches()` call.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116212175
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/DataStreamGroupAggregateRule.scala ---
    @@ -49,13 +49,19 @@ class DataStreamGroupAggregateRule
           throw TableException("DISTINCT aggregates are currently not supported.")
         }
     
    +    // check if we have over aggregates
    +    val overAggs = agg.getAggCallList.exists(_.getAggregation.requiresOver())
    +    if (overAggs) {
    +      throw TableException("OVER clause is necessary for requires over window functions")
    +    }
    +
         // check if we have grouping sets
         val groupSets = agg.getGroupSets.size() != 1 || agg.getGroupSets.get(0) != agg.getGroupSet
         if (groupSets || agg.indicator) {
           throw TableException("GROUPING SETS are currently not supported.")
         }
     
    -    !distinctAggs && !groupSets && !agg.indicator
    --- End diff --
    
    This check must not be removed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116444166
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/sql/WindowAggregateTest.scala ---
    @@ -31,6 +31,18 @@ class WindowAggregateTest extends TableTestBase {
       streamUtil.addTable[(Int, String, Long)](
         "MyTable", 'a, 'b, 'c, 'proctime.proctime, 'rowtime.rowtime)
     
    +  /**
    +    * OVER clause is necessary for [[OverAgg0]] window function.
    +    */
    +  @Test(expected = classOf[ValidationException])
    +  def testOverAggregation() = {
    +    streamUtil.addFunction("overAgg", new OverAgg0)
    +
    +    val sqlQuery = "SELECT overAgg(c, a) FROM MyTable"
    +
    +    streamUtil.verifySql(sqlQuery, "n/a")
    --- End diff --
    
    `streamUtil.tEnv.sql(sqlQuery)` is sufficient. We do not need to call the optimizer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink issue #3851: [FLINK-6462] [table] Add requiresOver interface for Aggre...

Posted by sunjincheng121 <gi...@git.apache.org>.
Github user sunjincheng121 commented on the issue:

    https://github.com/apache/flink/pull/3851
  
    Hi @fhueske Thank you very much. 
    Honestly, I need more review by myself which can saving the time of other reviewer.
    Best,
    SunJincheng


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116595446
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/table/validation/AggregationsValidationTest.scala ---
    @@ -20,13 +20,25 @@ package org.apache.flink.table.api.scala.batch.table.validation
     
     import org.apache.flink.api.scala._
     import org.apache.flink.api.scala.util.CollectionDataSets
    -import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.WeightedAvgWithMergeAndReset
    +import org.apache.flink.table.api.java.utils.UserDefinedAggFunctions.{OverAgg0, WeightedAvgWithMergeAndReset}
     import org.apache.flink.table.api.scala._
     import org.apache.flink.table.api.{TableEnvironment, ValidationException}
     import org.junit._
     
     class AggregationsValidationTest {
     
    +  /**
    +    * OVER clause is necessary for [[OverAgg0]] window function.
    +    */
    +  @Test(expected = classOf[ValidationException])
    +  def testOverAggregation(): Unit = {
    +    val env= ExecutionEnvironment.getExecutionEnvironment
    --- End diff --
    
    +space `val env = ExecutionEnvironment...`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] flink pull request #3851: [FLINK-6462] [table] Add requiresOver interface fo...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3851#discussion_r116443377
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/table/GroupWindowTest.scala ---
    @@ -30,6 +30,23 @@ import org.junit.{Ignore, Test}
     
     class GroupWindowTest extends TableTestBase {
     
    +  /**
    +    * OVER clause is necessary for [[OverAgg0]] window function.
    +    */
    +  @Test(expected = classOf[TableException])
    +  def testOverAggregation(): Unit = {
    +    val util = streamTestUtil()
    +    val table = util.addTable[(Long, Int, String)]('long, 'int, 'string, 'proctime.proctime)
    +
    +    val overAgg = new OverAgg0
    +    val windowedTable = table
    +      .window(Tumble over 2.rows on 'proctime as 'w)
    +      .groupBy('w, 'string)
    +      .select(overAgg('long, 'int))
    +
    +    util.verifyTable(windowedTable, "n/a")
    --- End diff --
    
    When the check is done in validation phase, we do not have to call `verifyTable()` (same for all other Table API tests).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---