You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by pn...@apache.org on 2018/09/21 11:43:35 UTC

[flink] 02/11: [hotfix][table] Deduplicate optimize code between stream and batch table environment

This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ed2e360a6b2c1cbf8708ea1586e82f8cadbd6f62
Author: Piotr Nowojski <pi...@gmail.com>
AuthorDate: Fri Aug 24 13:17:26 2018 +0200

    [hotfix][table] Deduplicate optimize code between stream and batch table environment
---
 .../flink/table/api/BatchTableEnvironment.scala    | 48 ++-------------
 .../flink/table/api/StreamTableEnvironment.scala   | 71 ++++++----------------
 .../apache/flink/table/api/TableEnvironment.scala  | 50 ++++++++++++++-
 3 files changed, 73 insertions(+), 96 deletions(-)

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 04a7916..522a03e 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -448,48 +448,12 @@ abstract class BatchTableEnvironment(
     * @return The optimized [[RelNode]] tree
     */
   private[flink] def optimize(relNode: RelNode): RelNode = {
-
-    // 0. convert sub-queries before query decorrelation
-    val convSubQueryPlan = runHepPlanner(
-      HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet)
-
-    // 0. convert table references
-    val fullRelNode = runHepPlanner(
-      HepMatchOrder.BOTTOM_UP,
-      FlinkRuleSets.TABLE_REF_RULES,
-      convSubQueryPlan,
-      relNode.getTraitSet)
-
-    // 1. decorrelate
-    val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
-
-    // 2. normalize the logical plan
-    val normRuleSet = getNormRuleSet
-    val normalizedPlan = if (normRuleSet.iterator().hasNext) {
-      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, decorPlan, decorPlan.getTraitSet)
-    } else {
-      decorPlan
-    }
-
-    // 3. optimize the logical Flink plan
-    val logicalOptRuleSet = getLogicalOptRuleSet
-    val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
-    val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
-      runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
-    } else {
-      normalizedPlan
-    }
-
-    // 4. optimize the physical Flink plan
-    val physicalOptRuleSet = getPhysicalOptRuleSet
-    val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASET).simplify()
-    val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
-      runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
-    } else {
-      logicalPlan
-    }
-
-    physicalPlan
+    val convSubQueryPlan = optimizeConvertSubQueries(relNode)
+    val fullNode = optimizeConvertTableReferences(convSubQueryPlan)
+    val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+    val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan)
+    val logicalPlan = optimizeLogicalPlan(normalizedPlan)
+    optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)
   }
 
   /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d31ce6c..860f8b2 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -803,59 +803,28 @@ abstract class StreamTableEnvironment(
     * @return The optimized [[RelNode]] tree
     */
   private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = {
+    val convSubQueryPlan = optimizeConvertSubQueries(relNode)
+    val fullNode = optimizeConvertTableReferences(convSubQueryPlan)
+    val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+    val planWithMaterializedTimeAttributes =
+      RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
+    val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes)
+    val logicalPlan = optimizeLogicalPlan(normalizedPlan)
+    val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM)
+    optimizeDecoratePlan(physicalPlan, updatesAsRetraction)
+  }
 
-    // 0. convert sub-queries before query decorrelation
-    val convSubQueryPlan = runHepPlanner(
-      HepMatchOrder.BOTTOM_UP, FlinkRuleSets.TABLE_SUBQUERY_RULES, relNode, relNode.getTraitSet)
-
-    // 0. convert table references
-    val fullRelNode = runHepPlanner(
-      HepMatchOrder.BOTTOM_UP,
-      FlinkRuleSets.TABLE_REF_RULES,
-      convSubQueryPlan,
-      relNode.getTraitSet)
-
-    // 1. decorrelate
-    val decorPlan = RelDecorrelator.decorrelateQuery(fullRelNode)
-
-    // 2. convert time indicators
-    val convPlan = RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
-
-    // 3. normalize the logical plan
-    val normRuleSet = getNormRuleSet
-    val normalizedPlan = if (normRuleSet.iterator().hasNext) {
-      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, convPlan, convPlan.getTraitSet)
-    } else {
-      convPlan
-    }
-
-    // 4. optimize the logical Flink plan
-    val logicalOptRuleSet = getLogicalOptRuleSet
-    val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
-    val logicalPlan = if (logicalOptRuleSet.iterator().hasNext) {
-      runVolcanoPlanner(logicalOptRuleSet, normalizedPlan, logicalOutputProps)
-    } else {
-      normalizedPlan
-    }
-
-    // 5. optimize the physical Flink plan
-    val physicalOptRuleSet = getPhysicalOptRuleSet
-    val physicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.DATASTREAM).simplify()
-    val physicalPlan = if (physicalOptRuleSet.iterator().hasNext) {
-      runVolcanoPlanner(physicalOptRuleSet, logicalPlan, physicalOutputProps)
-    } else {
-      logicalPlan
-    }
-
-    // 6. decorate the optimized plan
+  private[flink] def optimizeDecoratePlan(
+      relNode: RelNode,
+      updatesAsRetraction: Boolean): RelNode = {
     val decoRuleSet = getDecoRuleSet
-    val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
+    if (decoRuleSet.iterator().hasNext) {
       val planToDecorate = if (updatesAsRetraction) {
-        physicalPlan.copy(
-          physicalPlan.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
-          physicalPlan.getInputs)
+        relNode.copy(
+          relNode.getTraitSet.plus(new UpdateAsRetractionTrait(true)),
+          relNode.getInputs)
       } else {
-        physicalPlan
+        relNode
       }
       runHepPlanner(
         HepMatchOrder.BOTTOM_UP,
@@ -863,10 +832,8 @@ abstract class StreamTableEnvironment(
         planToDecorate,
         planToDecorate.getTraitSet)
     } else {
-      physicalPlan
+      relNode
     }
-
-    decoratedPlan
   }
 
   /**
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 5691ab7..cce270c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -26,7 +26,7 @@ import org.apache.calcite.config.Lex
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
 import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
-import org.apache.calcite.plan.{RelOptPlanner, RelOptUtil, RelTraitSet}
+import org.apache.calcite.plan.{Convention, RelOptPlanner, RelOptUtil, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.schema.SchemaPlus
 import org.apache.calcite.schema.impl.AbstractTable
@@ -54,6 +54,7 @@ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
 import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
 import org.apache.flink.table.plan.cost.DataSetCostFactory
 import org.apache.flink.table.plan.logical.{CatalogNode, LogicalRelNode}
+import org.apache.flink.table.plan.nodes.FlinkConventions
 import org.apache.flink.table.plan.rules.FlinkRuleSets
 import org.apache.flink.table.plan.schema.{RelTable, RowSchema, TableSourceSinkTable}
 import org.apache.flink.table.sinks.TableSink
@@ -233,6 +234,52 @@ abstract class TableEnvironment(val config: TableConfig) {
     */
   protected def getBuiltInPhysicalOptRuleSet: RuleSet
 
+  protected def optimizeConvertSubQueries(relNode: RelNode): RelNode = {
+    runHepPlanner(
+      HepMatchOrder.BOTTOM_UP,
+      FlinkRuleSets.TABLE_SUBQUERY_RULES,
+      relNode,
+      relNode.getTraitSet)
+  }
+
+  protected def optimizeConvertTableReferences(relNode: RelNode): RelNode = {
+    runHepPlanner(
+      HepMatchOrder.BOTTOM_UP,
+      FlinkRuleSets.TABLE_REF_RULES,
+      relNode,
+      relNode.getTraitSet)
+  }
+
+
+  protected def optimizeNormalizeLogicalPlan(relNode: RelNode): RelNode = {
+    val normRuleSet = getNormRuleSet
+    if (normRuleSet.iterator().hasNext) {
+      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, relNode.getTraitSet)
+    } else {
+      relNode
+    }
+  }
+
+  protected def optimizeLogicalPlan(relNode: RelNode): RelNode = {
+    val logicalOptRuleSet = getLogicalOptRuleSet
+    val logicalOutputProps = relNode.getTraitSet.replace(FlinkConventions.LOGICAL).simplify()
+    if (logicalOptRuleSet.iterator().hasNext) {
+      runVolcanoPlanner(logicalOptRuleSet, relNode, logicalOutputProps)
+    } else {
+      relNode
+    }
+  }
+
+  protected def optimizePhysicalPlan(relNode: RelNode, convention: Convention): RelNode = {
+    val physicalOptRuleSet = getPhysicalOptRuleSet
+    val physicalOutputProps = relNode.getTraitSet.replace(convention).simplify()
+    if (physicalOptRuleSet.iterator().hasNext) {
+      runVolcanoPlanner(physicalOptRuleSet, relNode, physicalOutputProps)
+    } else {
+      relNode
+    }
+  }
+
   /**
     * run HEP planner
     */
@@ -1308,5 +1355,4 @@ object TableEnvironment {
       case t: TypeInformation[_] => Array(t.asInstanceOf[TypeInformation[_]])
     }
   }
-
 }