You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by fh...@apache.org on 2017/03/30 22:04:35 UTC
[24/50] [abbrv] flink git commit: [FLINK-6089] [table] Add decoration
phase for stream queries to rewrite plans after the cost-based optimization.
[FLINK-6089] [table] Add decoration phase for stream queries to rewrite plans after the cost-based optimization.
This closes #3564.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6949c8c7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6949c8c7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6949c8c7
Branch: refs/heads/table-retraction
Commit: 6949c8c79c41344023df08dde2936f06daa00e0d
Parents: f97deaa
Author: hequn.chq <he...@alibaba-inc.com>
Authored: Thu Mar 16 11:11:17 2017 +0800
Committer: Fabian Hueske <fh...@apache.org>
Committed: Fri Mar 24 20:19:17 2017 +0100
----------------------------------------------------------------------
.../table/api/StreamTableEnvironment.scala | 38 ++++++++-
.../flink/table/calcite/CalciteConfig.scala | 89 +++++++++++++++++---
.../flink/table/plan/rules/FlinkRuleSets.scala | 9 +-
.../flink/table/CalciteConfigBuilderTest.scala | 69 +++++++++++++++
4 files changed, 188 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index d927c3a..225a675 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -25,7 +25,7 @@ import org.apache.calcite.plan.hep.HepMatchOrder
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.sql2rel.RelDecorrelator
-import org.apache.calcite.tools.RuleSet
+import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.typeutils.GenericTypeInfo
import org.apache.flink.streaming.api.datastream.DataStream
@@ -39,6 +39,8 @@ import org.apache.flink.table.sinks.{StreamTableSink, TableSink}
import org.apache.flink.table.sources.{StreamTableSource, TableSource}
import org.apache.flink.types.Row
+import _root_.scala.collection.JavaConverters._
+
/**
* The base class for stream TableEnvironments.
*
@@ -211,6 +213,26 @@ abstract class StreamTableEnvironment(
}
/**
+ * Returns the decoration rule set for this environment
+ * including a custom RuleSet configuration.
+ */
+ protected def getDecoRuleSet: RuleSet = {
+ val calciteConfig = config.getCalciteConfig
+ calciteConfig.getDecoRuleSet match {
+
+ case None =>
+ getBuiltInDecoRuleSet
+
+ case Some(ruleSet) =>
+ if (calciteConfig.replacesDecoRuleSet) {
+ ruleSet
+ } else {
+ RuleSets.ofList((getBuiltInDecoRuleSet.asScala ++ ruleSet.asScala).asJava)
+ }
+ }
+ }
+
+ /**
* Returns the built-in normalization rules that are defined by the environment.
*/
protected def getBuiltInNormRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_NORM_RULES
@@ -221,6 +243,11 @@ abstract class StreamTableEnvironment(
protected def getBuiltInOptRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_OPT_RULES
/**
+ * Returns the built-in decoration rules that are defined by the environment.
+ */
+ protected def getBuiltInDecoRuleSet: RuleSet = FlinkRuleSets.DATASTREAM_DECO_RULES
+
+ /**
* Generates the optimized [[RelNode]] tree from the original relational node tree.
*
* @param relNode The root node of the relational expression tree.
@@ -248,7 +275,14 @@ abstract class StreamTableEnvironment(
normalizedPlan
}
- optimizedPlan
+ // 4. decorate the optimized plan
+ val decoRuleSet = getDecoRuleSet
+ val decoratedPlan = if (decoRuleSet.iterator().hasNext) {
+ runHepPlanner(HepMatchOrder.BOTTOM_UP, decoRuleSet, optimizedPlan, optimizedPlan.getTraitSet)
+ } else {
+ optimizedPlan
+ }
+ decoratedPlan
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
index 65a61b2..ba8df81 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/calcite/CalciteConfig.scala
@@ -31,15 +31,36 @@ import scala.collection.JavaConverters._
* Builder for creating a Calcite configuration.
*/
class CalciteConfigBuilder {
+
+ /**
+ * Defines the normalization rule set. Normalization rules are dedicated for rewriting
+ * predicated logical plan before volcano optimization.
+ */
private var replaceNormRules: Boolean = false
private var normRuleSets: List[RuleSet] = Nil
+ /**
+ * Defines the optimization rule set. Optimization rules are used during volcano optimization.
+ */
private var replaceOptRules: Boolean = false
private var optRuleSets: List[RuleSet] = Nil
+ /**
+ * Defines the decoration rule set. Decoration rules are dedicated for rewriting predicated
+ * logical plan after volcano optimization.
+ */
+ private var replaceDecoRules: Boolean = false
+ private var decoRuleSets: List[RuleSet] = Nil
+
+ /**
+ * Defines the SQL operator tables.
+ */
private var replaceOperatorTable: Boolean = false
private var operatorTables: List[SqlOperatorTable] = Nil
+ /**
+ * Defines a SQL parser configuration.
+ */
private var replaceSqlParserConfig: Option[SqlParser.Config] = None
/**
@@ -81,6 +102,32 @@ class CalciteConfigBuilder {
}
/**
+ * Replaces the built-in decoration rule set with the given rule set.
+ *
+ * The decoration rules are applied after the cost-based optimization phase.
+ * The decoration phase allows to rewrite the optimized plan and is not cost-based.
+ *
+ */
+ def replaceDecoRuleSet(replaceRuleSet: RuleSet): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(replaceRuleSet)
+ decoRuleSets = List(replaceRuleSet)
+ replaceDecoRules = true
+ this
+ }
+
+ /**
+ * Appends the given decoration rule set to the built-in rule set.
+ *
+ * The decoration rules are applied after the cost-based optimization phase.
+ * The decoration phase allows to rewrite the optimized plan and is not cost-based.
+ */
+ def addDecoRuleSet(addedRuleSet: RuleSet): CalciteConfigBuilder = {
+ Preconditions.checkNotNull(addedRuleSet)
+ decoRuleSets = addedRuleSet :: decoRuleSets
+ this
+ }
+
+ /**
* Replaces the built-in SQL operator table with the given table.
*/
def replaceSqlOperatorTable(replaceSqlOperatorTable: SqlOperatorTable): CalciteConfigBuilder = {
@@ -113,35 +160,39 @@ class CalciteConfigBuilder {
val replacesNormRuleSet: Boolean,
val getOptRuleSet: Option[RuleSet],
val replacesOptRuleSet: Boolean,
+ val getDecoRuleSet: Option[RuleSet],
+ val replacesDecoRuleSet: Boolean,
val getSqlOperatorTable: Option[SqlOperatorTable],
val replacesSqlOperatorTable: Boolean,
val getSqlParserConfig: Option[SqlParser.Config])
extends CalciteConfig
+
/**
- * Builds a new [[CalciteConfig]].
+ * Convert the [[RuleSet]] List to [[Option]] type
*/
- def build(): CalciteConfig = new CalciteConfigImpl(
- normRuleSets match {
+ private def getRuleSet(inputRuleSet: List[RuleSet]): Option[RuleSet] = {
+ inputRuleSet match {
case Nil => None
case h :: Nil => Some(h)
case _ =>
// concat rule sets
val concatRules =
- normRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
+ inputRuleSet.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
Some(RuleSets.ofList(concatRules.asJava))
- },
+ }
+ }
+
+ /**
+ * Builds a new [[CalciteConfig]].
+ */
+ def build(): CalciteConfig = new CalciteConfigImpl(
+ getRuleSet(normRuleSets),
replaceNormRules,
- optRuleSets match {
- case Nil => None
- case h :: Nil => Some(h)
- case _ =>
- // concat rule sets
- val concatRules =
- optRuleSets.foldLeft(Nil: Iterable[RelOptRule])((c, r) => r.asScala ++ c)
- Some(RuleSets.ofList(concatRules.asJava))
- },
+ getRuleSet(optRuleSets),
replaceOptRules,
+ getRuleSet(decoRuleSets),
+ replaceDecoRules,
operatorTables match {
case Nil => None
case h :: Nil => Some(h)
@@ -179,6 +230,16 @@ trait CalciteConfig {
def getOptRuleSet: Option[RuleSet]
/**
+ * Returns whether this configuration replaces the built-in decoration rule set.
+ */
+ def replacesDecoRuleSet: Boolean
+
+ /**
+ * Returns a custom decoration rule set.
+ */
+ def getDecoRuleSet: Option[RuleSet]
+
+ /**
* Returns whether this configuration replaces the built-in SQL operator table.
*/
def replacesSqlOperatorTable: Boolean
http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 952ee34..1301c8d 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -23,7 +23,6 @@ import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.table.calcite.rules.{FlinkAggregateExpandDistinctAggregatesRule, FlinkAggregateJoinTransposeRule}
import org.apache.flink.table.plan.rules.dataSet._
import org.apache.flink.table.plan.rules.datastream._
-import org.apache.flink.table.plan.rules.datastream.{DataStreamCalcRule, DataStreamScanRule, DataStreamUnionRule}
object FlinkRuleSets {
@@ -186,4 +185,12 @@ object FlinkRuleSets {
PushFilterIntoStreamTableSourceScanRule.INSTANCE
)
+ /**
+ * RuleSet to decorate plans for stream / DataStream execution
+ */
+ val DATASTREAM_DECO_RULES: RuleSet = RuleSets.ofList(
+ // rules
+
+ )
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6949c8c7/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
index 6c07e28..d0de8fa 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/CalciteConfigBuilderTest.scala
@@ -39,6 +39,9 @@ class CalciteConfigBuilderTest {
assertFalse(cc.replacesOptRuleSet)
assertFalse(cc.getOptRuleSet.isDefined)
+
+ assertFalse(cc.replacesDecoRuleSet)
+ assertFalse(cc.getDecoRuleSet.isDefined)
}
@Test
@@ -47,6 +50,7 @@ class CalciteConfigBuilderTest {
val cc: CalciteConfig = new CalciteConfigBuilder()
.addNormRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
.replaceOptRuleSet(RuleSets.ofList(FilterMergeRule.INSTANCE))
+ .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
.build()
assertFalse(cc.replacesNormRuleSet)
@@ -54,6 +58,9 @@ class CalciteConfigBuilderTest {
assertTrue(cc.replacesOptRuleSet)
assertTrue(cc.getOptRuleSet.isDefined)
+
+ assertTrue(cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
}
@Test
@@ -181,6 +188,68 @@ class CalciteConfigBuilderTest {
}
@Test
+ def testReplaceDecorationRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .build()
+
+ assertEquals(true, cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
+ val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+ assertEquals(1, cSet.size)
+ assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ }
+
+ @Test
+ def testReplaceDecorationAddRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .replaceDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE))
+ .build()
+
+ assertEquals(true, cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
+ val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+ assertEquals(2, cSet.size)
+ assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ assertTrue(cSet.contains(ReduceExpressionsRule.PROJECT_INSTANCE))
+ }
+
+ @Test
+ def testAddDecorationRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .build()
+
+ assertEquals(false, cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
+ val cSet = cc.getDecoRuleSet.get.iterator().asScala.toSet
+ assertEquals(1, cSet.size)
+ assertTrue(cSet.contains(ReduceExpressionsRule.FILTER_INSTANCE))
+ }
+
+ @Test
+ def testAddAddDecorationRules(): Unit = {
+
+ val cc: CalciteConfig = new CalciteConfigBuilder()
+ .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.FILTER_INSTANCE))
+ .addDecoRuleSet(RuleSets.ofList(ReduceExpressionsRule.PROJECT_INSTANCE,
+ ReduceExpressionsRule.CALC_INSTANCE))
+ .build()
+
+ assertEquals(false, cc.replacesDecoRuleSet)
+ assertTrue(cc.getDecoRuleSet.isDefined)
+ val cList = cc.getDecoRuleSet.get.iterator().asScala.toList
+ assertEquals(3, cList.size)
+ assertEquals(cList.head, ReduceExpressionsRule.FILTER_INSTANCE)
+ assertEquals(cList(1), ReduceExpressionsRule.PROJECT_INSTANCE)
+ assertEquals(cList(2), ReduceExpressionsRule.CALC_INSTANCE)
+ }
+
+ @Test
def testDefaultOperatorTable(): Unit = {
val cc: CalciteConfig = new CalciteConfigBuilder()