You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by hequn8128 <gi...@git.apache.org> on 2018/01/24 05:27:47 UTC
[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug
GitHub user hequn8128 opened a pull request:
https://github.com/apache/flink/pull/5347
[FLINK-8492][table] Fix calc cost bug
## What is the purpose of the change
Fix calc cost bug. Currently, unsupported exception will be thrown when multi calc existing between correlate and TableFunctionScan.
## Brief change log
- Add a constant 1 to compCnt in `CommonCalc`
## Verifying this change
This change added tests and can be verified as follows:
- Added integration tests for udtf with multi cals
## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (no)
- The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no)
- The serializers: (no)
- The runtime per-record code paths (performance sensitive): (no)
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
- The S3 file system connector: (no)
## Documentation
- Does this pull request introduce a new feature? (no)
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/hequn8128/flink 8492
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/5347.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #5347
----
commit bf1b4d4773c3690dfe73a0e71b22730d8b5eb99c
Author: hequn8128 <ch...@...>
Date: 2018-01-24T04:25:13Z
[FLINK-8492][table] Fix calc cost bug
----
---
[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5347#discussion_r163880994
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/CorrelateUtil.scala ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.{RexProgram, RexProgramBuilder}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableFunctionScan}
+
+/**
+ * An utility for datasteam and dataset correlate rules.
+ */
+object CorrelateUtil {
+
+ /**
+ * Find only calc and table function
+ */
+ def findCalcAndTableFunction(calc: FlinkLogicalCalc): Boolean = {
+ val child = calc.getInput.asInstanceOf[RelSubset].getOriginal
+ child match {
+ case scan: FlinkLogicalTableFunctionScan => true
+ case calc: FlinkLogicalCalc => findCalcAndTableFunction(calc)
+ case _ => false
+ }
+ }
+
+ /**
+ * Get [[FlinkLogicalTableFunctionScan]] from the input calc.
+ */
+ def getTableScan(calc: FlinkLogicalCalc): RelNode = {
--- End diff --
rename to `getTableFunctionScan`
---
[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:
https://github.com/apache/flink/pull/5347
hi, @fhueske Thanks for your suggestions. The pr has been updated according to your comments.
---
[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5347#discussion_r163879206
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---
@@ -141,6 +143,7 @@ trait CommonCalc {
}
private[flink] def computeSelfCost(
+ rexBuilder: RexBuilder,
--- End diff --
We don't need the `RexBuilder`
---
[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:
https://github.com/apache/flink/pull/5347
hi @fhueske @twalthr , the pr has been updated. It would be great if you can take a look at it.
Changes mainly include:
1. Adapt estimateRowCount to be more accurate. The original implementation use a constant 0.75 to reduce the result which makes row count of merged calc bigger than the row count of un-merged calcs. Current implementation use a more accurate selectivity to reduce the result row count.
2. Merge calcs in convert rule of correrate. Double check to make sure unsupported exception won't be thrown
---
[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on the issue:
https://github.com/apache/flink/pull/5347
Thanks for the fix @hequn8128!
PR is good to merge.
---
[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5347#discussion_r163878119
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---
@@ -149,26 +152,42 @@ trait CommonCalc {
// conditions, etc. We only want to account for computations, not for simple projections.
// CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule
// in normalization stage. So we should ignore CASTs here in optimization stage.
- val compCnt = calcProgram.getExprList.asScala.toList.count {
- case _: RexInputRef => false
- case _: RexLiteral => false
- case c: RexCall if c.getOperator.getName.equals("CAST") => false
- case _ => true
- }
+ // Also, we add 1 to take calc RelNode number into consideration, so the cost of merged calc
+ // RelNode will less than the total cost of un-merged calcs.
+ val compCnt = calcProgram.getExprList.asScala.toList.count(isCom(_)) + 1
- val newRowCnt = estimateRowCount(calcProgram, rowCnt)
+ val newRowCnt = estimateRowCount(rexBuilder, calcProgram, rowCnt)
planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0)
}
private[flink] def estimateRowCount(
+ rexBuilder: RexBuilder,
--- End diff --
we don't need the `RexBuilder` if we get the condition `RexNode` from the `RexProgram`.
---
[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/5347
Thank you @hequn8128. I will merge this...
---
[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5347#discussion_r163880920
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/CorrelateUtil.scala ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.{RexProgram, RexProgramBuilder}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableFunctionScan}
+
+/**
+ * An utility for datasteam and dataset correlate rules.
+ */
+object CorrelateUtil {
+
+ /**
+ * Find only calc and table function
+ */
+ def findCalcAndTableFunction(calc: FlinkLogicalCalc): Boolean = {
--- End diff --
remove (functionality replaced by `getTableScan`.
---
[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5347#discussion_r163880732
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/common/CorrelateUtil.scala ---
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.plan.rules.common
+
+import org.apache.calcite.plan.volcano.RelSubset
+import org.apache.calcite.rel.RelNode
+import org.apache.calcite.rex.{RexProgram, RexProgramBuilder}
+import org.apache.flink.table.api.TableException
+import org.apache.flink.table.plan.nodes.logical.{FlinkLogicalCalc, FlinkLogicalTableFunctionScan}
+
+/**
+ * An utility for datasteam and dataset correlate rules.
+ */
+object CorrelateUtil {
+
+ /**
+ * Find only calc and table function
+ */
+ def findCalcAndTableFunction(calc: FlinkLogicalCalc): Boolean = {
+ val child = calc.getInput.asInstanceOf[RelSubset].getOriginal
+ child match {
+ case scan: FlinkLogicalTableFunctionScan => true
+ case calc: FlinkLogicalCalc => findCalcAndTableFunction(calc)
+ case _ => false
+ }
+ }
+
+ /**
+ * Get [[FlinkLogicalTableFunctionScan]] from the input calc.
+ */
+ def getTableScan(calc: FlinkLogicalCalc): RelNode = {
+ val child = calc.getInput.asInstanceOf[RelSubset].getOriginal
+ child match {
+ case scan: FlinkLogicalTableFunctionScan => scan
+ case calc: FlinkLogicalCalc => getTableScan(calc)
+ case _ => throw TableException("This must be a bug, could not find table scan")
--- End diff --
Combine this method with `findCalcAndTableFunction` and return an `Option[FlinkLogicalTableFunctionScan]`.
If the function returns `None` there no table function at the end.
---
[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:
https://github.com/apache/flink/pull/5347
Hi, @fhueske @twalthr ,
To solve the problem, we need to make the `estimateRowCount` in `CommonCalc` more accurate. I will update the pr tomorrow. Anyway, cost model can't solve the problem deterministically. The cost is just an estimate, so multi-cals will exist under some circumstances.
As for correlate, to make sure unsupported exception won't be thrown, i will double check whether multi calcs are exist, and merge the calcs if need to.
What do you think ? Thanks, Hequn.
---
[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/flink/pull/5347
---
[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug
Posted by twalthr <gi...@git.apache.org>.
Github user twalthr commented on the issue:
https://github.com/apache/flink/pull/5347
Thanks @hequn8128 and @fhueske. I will merge this...
---
[GitHub] flink issue #5347: [FLINK-8492][table] Fix calc cost bug
Posted by hequn8128 <gi...@git.apache.org>.
Github user hequn8128 commented on the issue:
https://github.com/apache/flink/pull/5347
@twalthr thanks for your review. I will take a look and update the pr.
---
[GitHub] flink pull request #5347: [FLINK-8492][table] Fix calc cost bug
Posted by fhueske <gi...@git.apache.org>.
Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/5347#discussion_r163877879
--- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/CommonCalc.scala ---
@@ -149,26 +152,42 @@ trait CommonCalc {
// conditions, etc. We only want to account for computations, not for simple projections.
// CASTs in RexProgram are reduced as far as possible by ReduceExpressionsRule
// in normalization stage. So we should ignore CASTs here in optimization stage.
- val compCnt = calcProgram.getExprList.asScala.toList.count {
- case _: RexInputRef => false
- case _: RexLiteral => false
- case c: RexCall if c.getOperator.getName.equals("CAST") => false
- case _ => true
- }
+ // Also, we add 1 to take calc RelNode number into consideration, so the cost of merged calc
+ // RelNode will less than the total cost of un-merged calcs.
+ val compCnt = calcProgram.getExprList.asScala.toList.count(isCom(_)) + 1
- val newRowCnt = estimateRowCount(calcProgram, rowCnt)
+ val newRowCnt = estimateRowCount(rexBuilder, calcProgram, rowCnt)
planner.getCostFactory.makeCost(newRowCnt, newRowCnt * compCnt, 0)
}
private[flink] def estimateRowCount(
+ rexBuilder: RexBuilder,
calcProgram: RexProgram,
rowCnt: Double): Double = {
if (calcProgram.getCondition != null) {
// we reduce the result card to push filters down
- (rowCnt * 0.75).max(1.0)
+ val exprs = RexUtil.composeConjunction(
--- End diff --
use `calcProgram.expandLocalRef(calcProgram.getCondition())` to only include the expressions that are relevant for the condition.
---