You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/24 19:23:36 UTC

[2/5] flink git commit: [FLINK-6089] [table] Add decoration phase for stream queries to rewrite plans after the cost-based optimization.

[FLINK-6089] [table] Add decoration phase for stream queries to rewrite plans after the cost-based optimization.

This closes #3564.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6949c8c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6949c8c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6949c8c7

Branch: refs/heads/master
Commit: 6949c8c79c41344023df08dde2936f06daa00e0d
Parents: f97deaa
Author: hequn.chq <he...@alibaba-inc.com>
Authored: Thu Mar 16 11:11:17 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100

----------------------------------------------------------------------
 .../table/api/StreamTableEnvironment.scala      | 38 ++++++++-
 .../flink/table/calcite/CalciteConfig.scala     | 89 +++++++++++++++++---
 .../flink/table/plan/rules/FlinkRuleSets.scala  |  9 +-
 .../flink/table/CalciteConfigBuilderTest.scala  | 69 +++++++++++++++
 4 files changed, 188 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d927c3a..225a675 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.RuleSet
+import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.GenericTypeInfo
 import org.apache.flink.streaming.api.datastream.DataStream
@@ -39,6 +39,8 @@ import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
 import org.apache.flink.table.sources.{StreamTableSource, TableSource}
 import org.apache.flink.types.Row
 
+import _root_.scala.collection.JavaConverters._
+
 /**
   * The base class for stream TableEnvironments.
   *
@@ -211,6 +213,26 @@ abstract class StreamTableEnvironment(
   }
 
   /**
+    * Returns the decoration rule set for this environment
+    * including a custom RuleSet configuration.
+    */
+  protected def getDecoRuleSet: RuleSet = {
+    val calciteConfig = config.getCalciteConfig
+    calciteConfig.getDecoRuleSet match {
+
+      case None =>
+        getBuiltInDecoRuleSet
+
+      case Some(ruleSet) =>
+        if (calciteConfig.replacesDecoRuleSet) {
+          ruleSet
+        } else {
+          RuleSets.ofList((getBuiltInDecoRuleSet.asScala ++ ruleSet.asScala).asJava)
+        }
+    }
+  }
+
+  /**
     * Returns the built-in normalization rules that are defined by the environment.
     */
   protected def getBuiltInNormRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_NORM_RULES
@@ -221,6 +243,11 @@ abstract class StreamTableEnvironment(
   protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
 
   /**
+    * Returns the built-in decoration rules that are defined by the environment.
+    */
+  protected def getBuiltInDecoRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_DECO_RULES
+
+  /**
     * Generates the optimized [[RelNode]] tree from the original relational node tree.
     *
     * @param relNode The root node of the relational expression tree.
@@ -248,7 +275,14 @@ abstract class StreamTableEnvironment(
       normalizedPlan
     }
 
-    optimizedPlan
+    // 4. decorate the optimized plan
+    val decoRuleSet = getDecoRuleSet
+    val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
+      runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, optimizedPlan, optimizedPlan.getTraitSet)
+    } else {
+      optimizedPlan
+    }
+    decoratedPlan
   }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
index 65a61b2..ba8df81 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -31,15 +31,36 @@ import scala.collection.JavaConverters._
   * Builder for creating a Calcite configuration.
   */
 class CalciteConfigBuilder {
+
+  /**
+    * Defines the normalization rule set. Normalization rules are dedicated for rewriting
+    * predicated logical plan before volcano optimization.
+    */
   private var replaceNormRules: Boolean = false
   private var normRuleSets: List[RuleSet] = Nil
 
+  /**
+    * Defines the optimization rule set. Optimization rules are used during volcano optimization.
+    */
   private var replaceOptRules: Boolean = false
   private var optRuleSets: List[RuleSet] = Nil
 
+  /**
+    * Defines the decoration rule set. Decoration rules are dedicated for rewriting predicated
+    * logical plan after volcano optimization.
+    */
+  private var replaceDecoRules: Boolean = false
+  private var decoRuleSets: List[RuleSet] = Nil
+
+  /**
+    * Defines the SQL operator tables.
+    */
   private var replaceOperatorTable: Boolean = false
   private var operatorTables: List[SqlOperatorTable] = Nil
 
+  /**
+    * Defines a SQL parser configuration.
+    */
   private var replaceSqlParserConfig: Option[SqlParser.Config] = None
 
   /**
@@ -81,6 +102,32 @@ class CalciteConfigBuilder {
   }
 
   /**
+    * Replaces the built-in decoration rule set with the given rule set.
+    *
+    * The decoration rules are applied after the cost-based optimization phase.
+    * The decoration phase allows to rewrite the optimized plan and is not cost-based.
+    *
+    */
+  def replaceDecoRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(replaceRuleSet)
+    decoRuleSets = List(replaceRuleSet)
+    replaceDecoRules = true
+    this
+  }
+
+  /**
+    * Appends the given decoration rule set to the built-in rule set.
+    *
+    * The decoration rules are applied after the cost-based optimization phase.
+    * The decoration phase allows to rewrite the optimized plan and is not cost-based.
+    */
+  def addDecoRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
+    Preconditions.checkNotNull(addedRuleSet)
+    decoRuleSets = addedRuleSet :: decoRuleSets
+    this
+  }
+
+  /**
     * Replaces the built-in SQL operator table with the given table.
     */
   def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
@@ -113,35 +160,39 @@ class CalciteConfigBuilder {
     val replacesNormRuleSet: Boolean,
     val getOptRuleSet: Option[RuleSet],
     val replacesOptRuleSet: Boolean,
+    val getDecoRuleSet: Option[RuleSet],
+    val replacesDecoRuleSet: Boolean,
     val getSqlOperatorTable: Option[SqlOperatorTable],
     val replacesSqlOperatorTable: Boolean,
     val getSqlParserConfig: Option[SqlParser.Config])
     extends CalciteConfig
 
+
   /**
-    * Builds a new [[CalciteConfig]].
+    * Convert the [[RuleSet]] List to [[Option]] type
     */
-  def build(): CalciteConfig = new CalciteConfigImpl(
-    normRuleSets match {
+  private def getRuleSet(inputRuleSet: List[RuleSet]): Option[RuleSet] = {
+    inputRuleSet match {
       case Nil => None
       case h :: Nil => Some(h)
       case _ =>
         // concat rule sets
         val concatRules =
-          normRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
+          inputRuleSet.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
         Some(RuleSets.ofList(concatRules.asJava))
-    },
+    }
+  }
+
+  /**
+    * Builds a new [[CalciteConfig]].
+    */
+  def build(): CalciteConfig = new CalciteConfigImpl(
+    getRuleSet(normRuleSets),
     replaceNormRules,
-    optRuleSets match {
-      case Nil => None
-      case h :: Nil => Some(h)
-      case _ =>
-        // concat rule sets
-        val concatRules =
-          optRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
-        Some(RuleSets.ofList(concatRules.asJava))
-    },
+    getRuleSet(optRuleSets),
     replaceOptRules,
+    getRuleSet(decoRuleSets),
+    replaceDecoRules,
     operatorTables match {
       case Nil => None
       case h :: Nil => Some(h)
@@ -179,6 +230,16 @@ trait CalciteConfig {
   def getOptRuleSet: Option[RuleSet]
 
   /**
+    * Returns whether this configuration replaces the built-in decoration rule set.
+    */
+  def replacesDecoRuleSet: Boolean
+
+  /**
+    * Returns a custom decoration rule set.
+    */
+  def getDecoRuleSet: Option[RuleSet]
+
+  /**
     * Returns whether this configuration replaces the built-in SQL operator table.
     */
   def replacesSqlOperatorTable: Boolean

http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 952ee34..1301c8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -23,7 +23,6 @@ import org.apache.calcite.tools.{RuleSet, RuleSets}
 import org.apache.flink.table.calcite.rules.{FlinkAggregateExpandDistinctAggregatesRule, FlinkAggregateJoinTransposeRule}
 import org.apache.flink.table.plan.rules.dataSet._
 import org.apache.flink.table.plan.rules.datastream._
-import org.apache.flink.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
 
 object FlinkRuleSets {
 
@@ -186,4 +185,12 @@ object FlinkRuleSets {
       PushFilterIntoStreamTableSourceScanRule.INSTANCE
   )
 
+  /**
+    * RuleSet to decorate plans for stream / DataStream execution
+    */
+  val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
+    // rules
+
+  )
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
index 6c07e28..d0de8fa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -39,6 +39,9 @@ class CalciteConfigBuilderTest {
 
     assertFalse(cc.replacesOptRuleSet)
     assertFalse(cc.getOptRuleSet.isDefined)
+
+    assertFalse(cc.replacesDecoRuleSet)
+    assertFalse(cc.getDecoRuleSet.isDefined)
   }
 
   @Test
@@ -47,6 +50,7 @@ class CalciteConfigBuilderTest {
     val cc: CalciteConfig = new CalciteConfigBuilder()
       .addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
       .replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+      .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
       .build()
 
     assertFalse(cc.replacesNormRuleSet)
@@ -54,6 +58,9 @@ class CalciteConfigBuilderTest {
 
     assertTrue(cc.replacesOptRuleSet)
     assertTrue(cc.getOptRuleSet.isDefined)
+
+    assertTrue(cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
   }
 
   @Test
@@ -181,6 +188,68 @@ class CalciteConfigBuilderTest {
   }
 
   @Test
+  def testReplaceDecorationRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .build()
+
+    assertEquals(true, cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
+    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+    assertEquals(1, cSet.size)
+    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+  }
+
+  @Test
+  def testReplaceDecorationAddRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE))
+      .build()
+
+    assertEquals(true, cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
+    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+    assertEquals(2, cSet.size)
+    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+    assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+  }
+
+  @Test
+  def testAddDecorationRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .build()
+
+    assertEquals(false, cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
+    val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+    assertEquals(1, cSet.size)
+    assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+  }
+
+  @Test
+  def testAddAddDecorationRules(): Unit = {
+
+    val cc: CalciteConfig = new CalciteConfigBuilder()
+      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+      .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE,
+                                      ReduceExpressionsRule.CALC_INSTANCE))
+      .build()
+
+    assertEquals(false, cc.replacesDecoRuleSet)
+    assertTrue(cc.getDecoRuleSet.isDefined)
+    val cList = cc.getDecoRuleSet.get.iterator().asScala.toList
+    assertEquals(3, cList.size)
+    assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE)
+    assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE)
+    assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE)
+  }
+
+  @Test
   def testDefaultOperatorTable(): Unit = {
 
     val cc: CalciteConfig = new CalciteConfigBuilder()