You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:35 UTC
[flink] 02/11: [hotfix][table] Deduplicate optimize code between
stream and batch table environment
This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit ed2e360a6b2c1cbf8708ea1586e82f8cadbd6f62
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Aug 24 13:17:26 2018 +0200
[hotfix][table] Deduplicate optimize code between stream and batch table environment
---
.../flink/table/api/BatchTableEnvironment.scala | 48 ++-------------
.../flink/table/api/StreamTableEnvironment.scala | 71 ++++++----------------
.../apache/flink/table/api/TableEnvironment.scala | 50 ++++++++++++++-
3 files changed, 73 insertions(+), 96 deletions(-)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 04a7916..522a03e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -448,48 +448,12 @@ abstract class BatchTableEnvironment(
* @return The optimized [[RelNode]] tree
*/
private[flink] def optimize(relNode: RelNode): RelNode = {
-
- // 0. convert sub-queries before query decorrelation
- val convSubQueryPlan = runHepPlanner(
- HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet)
-
- // 0. convert table references
- val fullRelNode = runHepPlanner(
- HepMatchOrder.BOTTOM_UP,
- FlinkRuleSets.TABLE_REF_RULES,
- convSubQueryPlan,
- relNode.getTraitSet)
-
- // 1. decorrelate
- val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
-
- // 2. normalize the logical plan
- val normRuleSet = getNormRuleSet
- val normalizedPlan = if (normRuleSet.iterator().hasNext) {
- runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet)
- } else {
- decorPlan
- }
-
- // 3. optimize the logical Flink plan
- val logicalOptRuleSet = getLogicalOptRuleSet
- val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
- val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
- runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
- } else {
- normalizedPlan
- }
-
- // 4. optimize the physical Flink plan
- val physicalOptRuleSet = getPhysicalOptRuleSet
- val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASET).simplify()
- val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
- runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
- } else {
- logicalPlan
- }
-
- physicalPlan
+ val convSubQueryPlan = optimizeConvertSubQueries(relNode)
+ val fullNode = optimizeConvertTableReferences(convSubQueryPlan)
+ val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+ val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan)
+ val logicalPlan = optimizeLogicalPlan(normalizedPlan)
+ optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)
}
/**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d31ce6c..860f8b2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -803,59 +803,28 @@ abstract class StreamTableEnvironment(
* @return The optimized [[RelNode]] tree
*/
private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = {
+ val convSubQueryPlan = optimizeConvertSubQueries(relNode)
+ val fullNode = optimizeConvertTableReferences(convSubQueryPlan)
+ val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+ val planWithMaterializedTimeAttributes =
+ RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
+ val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes)
+ val logicalPlan = optimizeLogicalPlan(normalizedPlan)
+ val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM)
+ optimizeDecoratePlan(physicalPlan, updatesAsRetraction)
+ }
- // 0. convert sub-queries before query decorrelation
- val convSubQueryPlan = runHepPlanner(
- HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet)
-
- // 0. convert table references
- val fullRelNode = runHepPlanner(
- HepMatchOrder.BOTTOM_UP,
- FlinkRuleSets.TABLE_REF_RULES,
- convSubQueryPlan,
- relNode.getTraitSet)
-
- // 1. decorrelate
- val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
-
- // 2. convert time indicators
- val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
-
- // 3. normalize the logical plan
- val normRuleSet = getNormRuleSet
- val normalizedPlan = if (normRuleSet.iterator().hasNext) {
- runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
- } else {
- convPlan
- }
-
- // 4. optimize the logical Flink plan
- val logicalOptRuleSet = getLogicalOptRuleSet
- val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
- val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
- runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
- } else {
- normalizedPlan
- }
-
- // 5. optimize the physical Flink plan
- val physicalOptRuleSet = getPhysicalOptRuleSet
- val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
- val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
- runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
- } else {
- logicalPlan
- }
-
- // 6. decorate the optimized plan
+ private[flink] def optimizeDecoratePlan(
+ relNode: RelNode,
+ updatesAsRetraction: Boolean): RelNode = {
val decoRuleSet = getDecoRuleSet
- val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
+ if (decoRuleSet.iterator().hasNext) {
val planToDecorate = if (updatesAsRetraction) {
- physicalPlan.copy(
- physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
- physicalPlan.getInputs)
+ relNode.copy(
+ relNode.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
+ relNode.getInputs)
} else {
- physicalPlan
+ relNode
}
runHepPlanner(
HepMatchOrder.BOTTOM_UP,
@@ -863,10 +832,8 @@ abstract class StreamTableEnvironment(
planToDecorate,
planToDecorate.getTraitSet)
} else {
- physicalPlan
+ relNode
}
-
- decoratedPlan
}
/**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 5691ab7..cce270c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.config.Lex
import org.apache.calcite.jdbc.CalciteSchema
import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
-import org.apache.calcite.plan.{RelOptPlanner, RelOptUtil, RelTraitSet}
+import org.apache.calcite.plan.{Convention, RelOptPlanner, RelOptUtil, RelTraitSet}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.schema.SchemaPlus
import org.apache.calcite.schema.impl.AbstractTable
@@ -54,6 +54,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
import org.apache.flink.table.plan.cost.DataSetCostFactory
import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.rules.FlinkRuleSets
import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSourceSinkTable}
import org.apache.flink.table.sinks.TableSink
@@ -233,6 +234,52 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
protected def getBuiltInPhysicalOptRuleSet: RuleSet
+ protected def optimizeConvertSubQueries(relNode: RelNode): RelNode = {
+ runHepPlanner(
+ HepMatchOrder.BOTTOM_UP,
+ FlinkRuleSets.TABLE_SUBQUERY_RULES,
+ relNode,
+ relNode.getTraitSet)
+ }
+
+ protected def optimizeConvertTableReferences(relNode: RelNode): RelNode = {
+ runHepPlanner(
+ HepMatchOrder.BOTTOM_UP,
+ FlinkRuleSets.TABLE_REF_RULES,
+ relNode,
+ relNode.getTraitSet)
+ }
+
+
+ protected def optimizeNormalizeLogicalPlan(relNode: RelNode): RelNode = {
+ val normRuleSet = getNormRuleSet
+ if (normRuleSet.iterator().hasNext) {
+ runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, relNode.getTraitSet)
+ } else {
+ relNode
+ }
+ }
+
+ protected def optimizeLogicalPlan(relNode: RelNode): RelNode = {
+ val logicalOptRuleSet = getLogicalOptRuleSet
+ val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+ if (logicalOptRuleSet.iterator().hasNext) {
+ runVolcanoPlanner(logicalOptRuleSet, relNode, logicalOutputProps)
+ } else {
+ relNode
+ }
+ }
+
+ protected def optimizePhysicalPlan(relNode: RelNode, convention: Convention): RelNode = {
+ val physicalOptRuleSet = getPhysicalOptRuleSet
+ val physicalOutputProps = relNode.getTraitSet.replace(convention).simplify()
+ if (physicalOptRuleSet.iterator().hasNext) {
+ runVolcanoPlanner(physicalOptRuleSet, relNode, physicalOutputProps)
+ } else {
+ relNode
+ }
+ }
+
/**
* run HEP planner
*/
@@ -1308,5 +1355,4 @@ object TableEnvironment {
case t: TypeInformation[_] => Array(t.asInstanceOf[TypeInformation[_]])
}
}
-
}