You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2018/03/15 21:55:08 UTC

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_...

GitHub user fhueske opened a pull request:

    https://github.com/apache/flink/pull/5706

    [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_SAMP, STDEV_POP functions on GROUP BY windows.

    ## What is the purpose of the change
    
    * Fixes the computation of `VAR_SAMP`, `VAR_POP`, `STDDEV_SAMP`, `STDDEV_POP` aggregations in the context of `GROUP BY` windows (`TUMBLE`, `HOP`, `SESSION`). Right now, these methods are computed as `AVG`.
    
    ## Brief change log
    
    * copy Calcite's `AggregateReduceFunctionsRule` to Flink and improve its extensibility
    * add a `WindowAggregateReduceFunctionsRule` based on the copied `AggregateReduceFunctionsRule` to decompose the faulty aggregation functions into `COUNT` and `SUM` functions.
    * add restriction to `FlinkLogicalWindowAggregateConverter` to prevent translation of group window aggregates with failing aggregation functions
    * prevent translation of `VAR_SAMP`, `VAR_POP`, `STDDEV_SAMP`, `STDDEV_POP` in `AggregateUtil`
    * add unit tests (plan validation) for batch (SQL, Table API) and stream (SQL, Table API)
    
    ## Verifying this change
    
    * run the added plan tests
    
    ## Does this pull request potentially affect one of the following parts:
    
      - Dependencies (does it add or upgrade a dependency): **no**
      - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
      - The serializers: **no**
      - The runtime per-record code paths (performance sensitive): **no**
      - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
      - The S3 file system connector: **no**
    
    ## Documentation
    
      - Does this pull request introduce a new feature? **no**
      - If yes, how is the feature documented? **n/a**


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/fhueske/flink tableVarStddevAggFix

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/flink/pull/5706.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #5706
    
----
commit 517567348b0ec0c23ef0c1dcc05c54a91d5c5671
Author: Fabian Hueske <fh...@...>
Date:   2018-03-15T20:04:00Z

    [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_SAMP, STDEV_POP functions on GROUP BY windows.

----


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r175181100
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
         FlinkConventions.LOGICAL,
         "FlinkLogicalWindowAggregateConverter") {
     
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
    +
    +    // we do not support these functions natively
    +    // they have to be converted using the WindowAggregateReduceFunctionsRule
    +    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
    +      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
    --- End diff --
    
    Replacing the current code by `SqlKind.AVG_AGG_FUNCTIONS.contains()` lead to several test failures. These tests expected an `AVG` aggregation function that was now replaced by `SUM / COUNT`.


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r175183811
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
         FlinkConventions.LOGICAL,
         "FlinkLogicalWindowAggregateConverter") {
     
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
    +
    +    // we do not support these functions natively
    +    // they have to be converted using the WindowAggregateReduceFunctionsRule
    +    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
    +      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
    --- End diff --
    
    How about SqlKink.AVG_AGG_FUNCTIONS.contains(kind) && kind != SqlKind.SUM && kind != SqlKind.AVG?


---

[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5706
  
    Thanks for the feedback everybody!
    Will merge the PR tomorrow.


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r175296744
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
         FlinkConventions.LOGICAL,
         "FlinkLogicalWindowAggregateConverter") {
     
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
    +
    +    // we do not support these functions natively
    +    // they have to be converted using the WindowAggregateReduceFunctionsRule
    +    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
    +      // we support AVG
    +      case SqlKind.AVG => true
    +      // but none of the other AVG agg functions
    +      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
    +      case _ => true
    +    }
    +
    +    !agg.containsDistinctCall() && supported
    --- End diff --
    
    shouldn't the logical rule supports distinct call here? It seems like previously the error were thrown on the `DataSetWindowAggregateRule` and `DataStreamWindowAggregateRule` respectively. Any chance we can add a unit-test to further clarify this change?



---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r174965171
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
         FlinkConventions.LOGICAL,
         "FlinkLogicalWindowAggregateConverter") {
     
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
    +
    +    // we do not support these functions natively
    +    // they have to be converted using the WindowAggregateReduceFunctionsRule
    +    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
    +      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
    --- End diff --
    
    How about AVG and SUM? they are also in AggregateReduceFunctionsRule. Also, I think it's better to use SqlKind.AVG_AGG_FUNCTIONS.contains() or AggregateReduceFunctionsRule.isReducible() (it's private now though) here in the case statement. So it will keep consistent if calcite changes.


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r175056688
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
         FlinkConventions.LOGICAL,
         "FlinkLogicalWindowAggregateConverter") {
     
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
    +
    +    // we do not support these functions natively
    +    // they have to be converted using the WindowAggregateReduceFunctionsRule
    +    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
    +      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
    --- End diff --
    
    We have a built-in function for AVG (which we don't really need anymore) and SUM, so we could translate such plans.
    But I agree, using SqlKind.AVG_AGG_FUNCTIONS.contains() is better.


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r175825522
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
         FlinkConventions.LOGICAL,
         "FlinkLogicalWindowAggregateConverter") {
     
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
    +
    +    // we do not support these functions natively
    +    // they have to be converted using the WindowAggregateReduceFunctionsRule
    +    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
    +      // we support AVG
    +      case SqlKind.AVG => true
    +      // but none of the other AVG agg functions
    +      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
    +      case _ => true
    +    }
    +
    +    !agg.containsDistinctCall() && supported
    --- End diff --
    
    Yes. This was kinda confusing to me, we should clean this up when adding DISTINCT support. Thanks for the update @fhueske 


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r175230116
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
         FlinkConventions.LOGICAL,
         "FlinkLogicalWindowAggregateConverter") {
     
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
    +
    +    // we do not support these functions natively
    +    // they have to be converted using the WindowAggregateReduceFunctionsRule
    +    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
    +      case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
    --- End diff --
    
    sounds good to me. Will update the PR


---

[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5706
  
    updated PR


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_...

Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r174962566
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java ---
    @@ -0,0 +1,590 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.calcite.rel.rules;
    +
    +import org.apache.calcite.plan.RelOptCluster;
    +import org.apache.calcite.plan.RelOptRule;
    +import org.apache.calcite.plan.RelOptRuleCall;
    +import org.apache.calcite.plan.RelOptRuleOperand;
    +import org.apache.calcite.rel.RelNode;
    +import org.apache.calcite.rel.core.Aggregate;
    +import org.apache.calcite.rel.core.AggregateCall;
    +import org.apache.calcite.rel.core.RelFactories;
    +import org.apache.calcite.rel.logical.LogicalAggregate;
    +import org.apache.calcite.rel.type.RelDataType;
    +import org.apache.calcite.rel.type.RelDataTypeFactory;
    +import org.apache.calcite.rel.type.RelDataTypeField;
    +import org.apache.calcite.rex.RexBuilder;
    +import org.apache.calcite.rex.RexLiteral;
    +import org.apache.calcite.rex.RexNode;
    +import org.apache.calcite.sql.SqlAggFunction;
    +import org.apache.calcite.sql.SqlKind;
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
    +import org.apache.calcite.sql.type.SqlTypeUtil;
    +import org.apache.calcite.tools.RelBuilder;
    +import org.apache.calcite.tools.RelBuilderFactory;
    +import org.apache.calcite.util.CompositeList;
    +import org.apache.calcite.util.ImmutableIntList;
    +import org.apache.calcite.util.Util;
    +
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +
    +import java.math.BigDecimal;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +
    +/*
    + * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT MORE EXTENSIBLE.
    --- End diff --
    
    I think we should create a Calcite JIRA to allow AggregateReduceFunctionsRule in Calcite to support this extension, document the JIRA ticket here. And remove this overwrite once Calcite is upgraded. What do you think?


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/flink/pull/5706


---

[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5706
  
    PR updated


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r175055646
  
    --- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java ---
    @@ -0,0 +1,590 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to you under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.calcite.rel.rules;
    +
    +import org.apache.calcite.plan.RelOptCluster;
    +import org.apache.calcite.plan.RelOptRule;
    +import org.apache.calcite.plan.RelOptRuleCall;
    +import org.apache.calcite.plan.RelOptRuleOperand;
    +import org.apache.calcite.rel.RelNode;
    +import org.apache.calcite.rel.core.Aggregate;
    +import org.apache.calcite.rel.core.AggregateCall;
    +import org.apache.calcite.rel.core.RelFactories;
    +import org.apache.calcite.rel.logical.LogicalAggregate;
    +import org.apache.calcite.rel.type.RelDataType;
    +import org.apache.calcite.rel.type.RelDataTypeFactory;
    +import org.apache.calcite.rel.type.RelDataTypeField;
    +import org.apache.calcite.rex.RexBuilder;
    +import org.apache.calcite.rex.RexLiteral;
    +import org.apache.calcite.rex.RexNode;
    +import org.apache.calcite.sql.SqlAggFunction;
    +import org.apache.calcite.sql.SqlKind;
    +import org.apache.calcite.sql.fun.SqlStdOperatorTable;
    +import org.apache.calcite.sql.type.SqlTypeUtil;
    +import org.apache.calcite.tools.RelBuilder;
    +import org.apache.calcite.tools.RelBuilderFactory;
    +import org.apache.calcite.util.CompositeList;
    +import org.apache.calcite.util.ImmutableIntList;
    +import org.apache.calcite.util.Util;
    +
    +import com.google.common.collect.ImmutableList;
    +import com.google.common.collect.Lists;
    +import com.google.common.collect.Maps;
    +
    +import java.math.BigDecimal;
    +import java.util.ArrayList;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Map;
    +
    +/*
    + * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT MORE EXTENSIBLE.
    --- End diff --
    
    Yes, I agree. I would be much better to have this in code in Calcite. 
    However, the changes are very Flink specific (we need to add a few fields to the projection).
    OTOH its just moving some code in a protected function, so no change in functionality and only few lines touched.
    
    I'll create a JIRA in Calcite and reference the issue. In case, Calcite does not want the change, we can keep the class in Flink.


---

[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:

    https://github.com/apache/flink/pull/5706
  
    Updated the PR with the Calcite issue. 


---

[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5706#discussion_r175716765
  
    --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
    @@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
         FlinkConventions.LOGICAL,
         "FlinkLogicalWindowAggregateConverter") {
     
    +  override def matches(call: RelOptRuleCall): Boolean = {
    +    val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
    +
    +    // we do not support these functions natively
    +    // they have to be converted using the WindowAggregateReduceFunctionsRule
    +    val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
    +      // we support AVG
    +      case SqlKind.AVG => true
    +      // but none of the other AVG agg functions
    +      case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
    +      case _ => true
    +    }
    +
    +    !agg.containsDistinctCall() && supported
    --- End diff --
    
    Hmm, that's a good point. 
    In fact, there won't be any plan with a `DISTINCT` aggregation in a `LogicalWindowAggregate` because `LogicalWindowAggregateRule` prevents translation of (`Calc(TUMBLE) -> Aggregate()`) into (`WindowAggregate(TUMBLE)`) if there is a distinct aggregate. This prevents window aggregates in SQL queries being translated into `WindowAggregate`. The Table API does not even have an API to define such queries.
    
    So, I'd simply remove the `containsDistinctCall()` check for now. We should definitely clean this up when we add support for DISTINCT aggregates.
    
    @walterddr, are you fine with this?



---