You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by fhueske <gi...@git.apache.org> on 2018/03/15 21:55:08 UTC
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_...
GitHub user fhueske opened a pull request:
https://github.com/apache/flink/pull/5706
[FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_SAMP, STDEV_POP functions on GROUP BY windows.
## What is the purpose of the change
* Fixes the computation of `VAR_SAMP`, `VAR_POP`, `STDDEV_SAMP`, `STDDEV_POP` aggregations in the context of `GROUP BY` windows (`TUMBLE`, `HOP`, `SESSION`). Right now, these methods are computed as `AVG`.
## Brief change log
* copy Calcite's `AggregateReduceFunctionsRule` to Flink and improve its extensibility
* add a `WindowAggregateReduceFunctionsRule` based on the copied `AggregateReduceFunctionsRule` to decompose the faulty aggregation functions into `COUNT` and `SUM` functions.
* add restriction to `FlinkLogicalWindowAggregateConverter` to prevent translation of group window aggregates with failing aggregation functions
* prevent translation of `VAR_SAMP`, `VAR_POP`, `STDDEV_SAMP`, `STDDEV_POP` in `AggregateUtil`
* add unit tests (plan validation) for batch (SQL, Table API) and stream (SQL, Table API)
## Verifying this change
* run the added plan tests
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): **no**
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no**
- The serializers: **no**
- The runtime per-record code paths (performance sensitive): **no**
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
- The S3 file system connector: **no**
## Documentation
- Does this pull request introduce a new feature? **no**
- If yes, how is the feature documented? **n/a**
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/fhueske/flink tableVarStddevAggFix
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5706.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5706
----
commit 517567348b0ec0c23ef0c1dcc05c54a91d5c5671
Author: Fabian Hueske <fh...@...>
Date: 2018-03-15T20:04:00Z
[FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_SAMP, STDEV_POP functions on GROUP BY windows.
----
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175181100
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
--- End diff --
Replacing the current code by `SqlKind.AVG_AGG_FUNCTIONS.contains()` lead to several test failures. These tests expected an `AVG` aggregation function that was now replaced by `SUM / COUNT`.
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175183811
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
--- End diff --
How about SqlKink.AVG_AGG_FUNCTIONS.contains(kind) && kind != SqlKind.SUM && kind != SqlKind.AVG?
---
[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/5706
Thanks for the feedback everybody!
Will merge the PR tomorrow.
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175296744
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ // we support AVG
+ case SqlKind.AVG => true
+ // but none of the other AVG agg functions
+ case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+ case _ => true
+ }
+
+ !agg.containsDistinctCall() && supported
--- End diff --
shouldn't the logical rule supports distinct call here? It seems like previously the error were thrown on the `DataSetWindowAggregateRule` and `DataStreamWindowAggregateRule` respectively. Any chance we can add a unit-test to further clarify this change?
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r174965171
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
--- End diff --
How about AVG and SUM? they are also in AggregateReduceFunctionsRule. Also, I think it's better to use SqlKind.AVG_AGG_FUNCTIONS.contains() or AggregateReduceFunctionsRule.isReducible() (it's private now though) here in the case statement. So it will keep consistent if calcite changes.
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175056688
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
--- End diff --
We have a built-in function for AVG (which we don't really need anymore) and SUM, so we could translate such plans.
But I agree, using SqlKind.AVG_AGG_FUNCTIONS.contains() is better.
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by walterddr <gi...@git.apache.org>.
Github user walterddr commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175825522
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ // we support AVG
+ case SqlKind.AVG => true
+ // but none of the other AVG agg functions
+ case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+ case _ => true
+ }
+
+ !agg.containsDistinctCall() && supported
--- End diff --
Yes. This was kinda confusing to me, we should clean this up when adding DISTINCT support. Thanks for the update @fhueske
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175230116
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
@@ -103,6 +106,19 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ case SqlKind.STDDEV_POP | SqlKind.STDDEV_SAMP | SqlKind.VAR_POP | SqlKind.VAR_SAMP => false
--- End diff --
sounds good to me. Will update the PR
---
[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/5706
updated PR
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDEV_...
Posted by suez1224 <gi...@git.apache.org>.
Github user suez1224 commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r174962566
--- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java ---
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT MORE EXTENSIBLE.
--- End diff --
I think we should create a Calcite JIRA to allow AggregateReduceFunctionsRule in Calcite to support this extension, document the JIRA ticket here. And remove this overwrite once Calcite is upgraded. What do you think?
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5706
---
[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/5706
PR updated
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175055646
--- Diff: flink-libraries/flink-table/src/main/java/org/apache/calcite/rel/rules/AggregateReduceFunctionsRule.java ---
@@ -0,0 +1,590 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to you under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.calcite.rel.rules;
+
+import org.apache.calcite.plan.RelOptCluster;
+import org.apache.calcite.plan.RelOptRule;
+import org.apache.calcite.plan.RelOptRuleCall;
+import org.apache.calcite.plan.RelOptRuleOperand;
+import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Aggregate;
+import org.apache.calcite.rel.core.AggregateCall;
+import org.apache.calcite.rel.core.RelFactories;
+import org.apache.calcite.rel.logical.LogicalAggregate;
+import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeFactory;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rex.RexBuilder;
+import org.apache.calcite.rex.RexLiteral;
+import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.sql.SqlAggFunction;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.fun.SqlStdOperatorTable;
+import org.apache.calcite.sql.type.SqlTypeUtil;
+import org.apache.calcite.tools.RelBuilder;
+import org.apache.calcite.tools.RelBuilderFactory;
+import org.apache.calcite.util.CompositeList;
+import org.apache.calcite.util.ImmutableIntList;
+import org.apache.calcite.util.Util;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/*
+ * THIS FILE HAS BEEN COPIED FROM THE APACHE CALCITE PROJECT TO MAKE IT MORE EXTENSIBLE.
--- End diff --
Yes, I agree. I would be much better to have this in code in Calcite.
However, the changes are very Flink specific (we need to add a few fields to the projection).
OTOH its just moving some code in a protected function, so no change in functionality and only few lines touched.
I'll create a JIRA in Calcite and reference the issue. In case, Calcite does not want the change, we can keep the class in Flink.
---
[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/5706
Updated the PR with the Calcite issue.
---
[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5706#discussion_r175716765
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
FlinkConventions.LOGICAL,
"FlinkLogicalWindowAggregateConverter") {
+ override def matches(call: RelOptRuleCall): Boolean = {
+ val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+ // we do not support these functions natively
+ // they have to be converted using the WindowAggregateReduceFunctionsRule
+ val supported = agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+ // we support AVG
+ case SqlKind.AVG => true
+ // but none of the other AVG agg functions
+ case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+ case _ => true
+ }
+
+ !agg.containsDistinctCall() && supported
--- End diff --
Hmm, that's a good point.
In fact, there won't be any plan with a `DISTINCT` aggregation in a `LogicalWindowAggregate` because `LogicalWindowAggregateRule` prevents translation of (`Calc(TUMBLE) -> Aggregate()`) into (`WindowAggregate(TUMBLE)`) if there is a distinct aggregate. This prevents window aggregates in SQL queries being translated into `WindowAggregate`. The Table API does not even have an API to define such queries.
So, I'd simply remove the `containsDistinctCall()` check for now. We should definitely clean this up when we add support for DISTINCT aggregates.
@walterddr, are you fine with this?
---