You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/11/05 12:29:53 UTC

[GitHub] pnowojski closed pull request #6981: [FLINK-10638][table] Invalid table scan resolution for temporal join queries

pnowojski closed pull request #6981: [FLINK-10638][table] Invalid table scan resolution for temporal join queries
URL: https://github.com/apache/flink/pull/6981
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
index 6a7a921bffb..99e9d7e6d01 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/BatchTableEnvironment.scala
@@ -449,9 +449,8 @@ abstract class BatchTableEnvironment(
     */
   private[flink] def optimize(relNode: RelNode): RelNode = {
     val convSubQueryPlan = optimizeConvertSubQueries(relNode)
-    val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan)
-    val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan)
-    val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+    val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
+    val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
     val normalizedPlan = optimizeNormalizeLogicalPlan(decorPlan)
     val logicalPlan = optimizeLogicalPlan(normalizedPlan)
     optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASET)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
index 4973f34147f..8c6a1e0a04b 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
@@ -804,13 +804,13 @@ abstract class StreamTableEnvironment(
     */
   private[flink] def optimize(relNode: RelNode, updatesAsRetraction: Boolean): RelNode = {
     val convSubQueryPlan = optimizeConvertSubQueries(relNode)
-    val temporalTableJoinPlan = optimizeConvertToTemporalJoin(convSubQueryPlan)
-    val fullNode = optimizeConvertTableReferences(temporalTableJoinPlan)
-    val decorPlan = RelDecorrelator.decorrelateQuery(fullNode)
+    val expandedPlan = optimizeExpandPlan(convSubQueryPlan)
+    val decorPlan = RelDecorrelator.decorrelateQuery(expandedPlan)
     val planWithMaterializedTimeAttributes =
       RelTimeIndicatorConverter.convert(decorPlan, getRelBuilder.getRexBuilder)
     val normalizedPlan = optimizeNormalizeLogicalPlan(planWithMaterializedTimeAttributes)
     val logicalPlan = optimizeLogicalPlan(normalizedPlan)
+
     val physicalPlan = optimizePhysicalPlan(logicalPlan, FlinkConventions.DATASTREAM)
     optimizeDecoratePlan(physicalPlan, updatesAsRetraction)
   }
@@ -827,7 +827,7 @@ abstract class StreamTableEnvironment(
       } else {
         relNode
       }
-      runHepPlanner(
+      runHepPlannerSequentially(
         HepMatchOrder.BOTTOM_UP,
         decoRuleSet,
         planToDecorate,
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
index 58831d10271..26f9e50fedd 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala
@@ -25,7 +25,7 @@ import com.google.common.collect.ImmutableList
 import org.apache.calcite.config.Lex
 import org.apache.calcite.jdbc.CalciteSchema
 import org.apache.calcite.plan.RelOptPlanner.CannotPlanException
-import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgramBuilder}
+import org.apache.calcite.plan.hep.{HepMatchOrder, HepPlanner, HepProgram, HepProgramBuilder}
 import org.apache.calcite.plan.{Convention, RelOptPlanner, RelOptUtil, RelTraitSet}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.schema.SchemaPlus
@@ -256,34 +256,31 @@ abstract class TableEnvironment(val config: TableConfig) {
   protected def getBuiltInPhysicalOptRuleSet: RuleSet
 
   protected def optimizeConvertSubQueries(relNode: RelNode): RelNode = {
-    runHepPlanner(
+    runHepPlannerSequentially(
       HepMatchOrder.BOTTOM_UP,
       FlinkRuleSets.TABLE_SUBQUERY_RULES,
       relNode,
       relNode.getTraitSet)
   }
 
-  protected def optimizeConvertToTemporalJoin(relNode: RelNode): RelNode = {
-    runHepPlanner(
-      HepMatchOrder.BOTTOM_UP,
-      FlinkRuleSets.TEMPORAL_JOIN_RULES,
+  protected def optimizeExpandPlan(relNode: RelNode): RelNode = {
+    val result = runHepPlannerSimultaneously(
+      HepMatchOrder.TOP_DOWN,
+      FlinkRuleSets.EXPAND_PLAN_RULES,
       relNode,
       relNode.getTraitSet)
-  }
 
-  protected def optimizeConvertTableReferences(relNode: RelNode): RelNode = {
-    runHepPlanner(
-      HepMatchOrder.BOTTOM_UP,
-      FlinkRuleSets.TABLE_REF_RULES,
-      relNode,
-      relNode.getTraitSet)
+    runHepPlannerSequentially(
+      HepMatchOrder.TOP_DOWN,
+      FlinkRuleSets.POST_EXPAND_CLEAN_UP_RULES,
+      result,
+      result.getTraitSet)
   }
 
-
   protected def optimizeNormalizeLogicalPlan(relNode: RelNode): RelNode = {
     val normRuleSet = getNormRuleSet
     if (normRuleSet.iterator().hasNext) {
-      runHepPlanner(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, relNode.getTraitSet)
+      runHepPlannerSequentially(HepMatchOrder.BOTTOM_UP, normRuleSet, relNode, relNode.getTraitSet)
     } else {
       relNode
     }
@@ -310,13 +307,16 @@ abstract class TableEnvironment(val config: TableConfig) {
   }
 
   /**
-    * run HEP planner
+    * run HEP planner with rules applied one by one. First apply one rule to all of the nodes
+    * and only then apply the next rule. If a rule creates a new node preceding rules will not
+    * be applied to the newly created node.
     */
-  protected def runHepPlanner(
+  protected def runHepPlannerSequentially(
     hepMatchOrder: HepMatchOrder,
     ruleSet: RuleSet,
     input: RelNode,
     targetTraits: RelTraitSet): RelNode = {
+
     val builder = new HepProgramBuilder
     builder.addMatchOrder(hepMatchOrder)
 
@@ -324,8 +324,36 @@ abstract class TableEnvironment(val config: TableConfig) {
     while (it.hasNext) {
       builder.addRuleInstance(it.next())
     }
+    runHepPlanner(builder.build(), input, targetTraits)
+  }
+
+  /**
+    * run HEP planner with rules applied simultaneously. Apply all of the rules to the given
+    * node before going to the next one. If a rule creates a new node all of the rules will
+    * be applied to this new node.
+    */
+  protected def runHepPlannerSimultaneously(
+    hepMatchOrder: HepMatchOrder,
+    ruleSet: RuleSet,
+    input: RelNode,
+    targetTraits: RelTraitSet): RelNode = {
+
+    val builder = new HepProgramBuilder
+    builder.addMatchOrder(hepMatchOrder)
+
+    builder.addRuleCollection(ruleSet.asScala.toList.asJava)
+    runHepPlanner(builder.build(), input, targetTraits)
+  }
+
+  /**
+    * run HEP planner
+    */
+  protected def runHepPlanner(
+    hepProgram: HepProgram,
+    input: RelNode,
+    targetTraits: RelTraitSet): RelNode = {
 
-    val planner = new HepPlanner(builder.build, frameworkConfig.getContext)
+    val planner = new HepPlanner(hepProgram, frameworkConfig.getContext)
     planner.setRoot(input)
     if (input.getTraitSet != targetTraits) {
       planner.changeTraits(input, targetTraits.simplify)
diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
index 6e2ccdeba5b..5e0ee32ad6c 100644
--- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
+++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/FlinkRuleSets.scala
@@ -39,18 +39,14 @@ object FlinkRuleSets {
     SubQueryRemoveRule.JOIN)
 
   /**
-    * Handles proper conversion of correlate queries with temporal table functions
-    * into temporal table joins. This can create new table scans in the plan.
+    * Expand plan by replacing references to tables into a proper plan sub trees. Those rules
+    * can create new plan nodes.
     */
-  val TEMPORAL_JOIN_RULES: RuleSet = RuleSets.ofList(
-    LogicalCorrelateToTemporalTableJoinRule.INSTANCE
-  )
+  val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
+    LogicalCorrelateToTemporalTableJoinRule.INSTANCE,
+    TableScanRule.INSTANCE)
 
-  /**
-    * Convert table references before query decorrelation.
-    */
-  val TABLE_REF_RULES: RuleSet = RuleSets.ofList(
-    TableScanRule.INSTANCE,
+  val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
     EnumerableToLogicalTableScan.INSTANCE)
 
   val LOGICAL_OPT_RULES: RuleSet = RuleSets.ofList(
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
index 3c47f562aae..27c40bbbef2 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/TemporalTableJoinTest.scala
@@ -87,7 +87,9 @@ class TemporalTableJoinTest extends TableTestBase {
 
     val ratesHistory = util.addTable[(Timestamp, String, String, Int, Int)](
       "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate, 'secondary_key)
-    val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+    val rates = ratesHistory
+      .filter('rate > 110L)
+      .createTemporalTableFunction('rowtime, 'currency)
     util.addFunction("Rates", rates)
 
     val sqlQuery =
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
index f8d49238430..299c14417b6 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/table/TemporalTableJoinTest.scala
@@ -87,7 +87,9 @@ class TemporalTableJoinTest extends TableTestBase {
 
     val ratesHistory = util.addTable[(Timestamp, String, String, Int, Int)](
       "RatesHistory", 'rowtime.rowtime, 'comment, 'currency, 'rate, 'secondary_key)
-    val rates = ratesHistory.createTemporalTableFunction('rowtime, 'currency)
+    val rates = ratesHistory
+      .filter('rate > 110L)
+      .createTemporalTableFunction('rowtime, 'currency)
     util.addFunction("Rates", rates)
 
     val result = orders
@@ -226,7 +228,8 @@ object TemporalTableJoinTest {
           unaryNode(
             "DataStreamCalc",
             streamTableNode(2),
-            term("select", "rowtime, currency, rate, secondary_key")
+            term("select", "rowtime, currency, rate, secondary_key"),
+            term("where", ">(rate, 110)")
           ),
           term("where",
             "AND(" +
diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
index df0f01be0d5..0fb175370fb 100644
--- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
+++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/TemporalJoinITCase.scala
@@ -127,8 +127,6 @@ class TemporalJoinITCase extends StreamingWithStateTestBase {
 
     var expectedOutput = new mutable.HashSet[String]()
     expectedOutput += (2 * 114).toString
-    expectedOutput += (1 * 102).toString
-    expectedOutput += (50 * 1).toString
     expectedOutput += (3 * 116).toString
 
     val orders = env
@@ -142,11 +140,15 @@ class TemporalJoinITCase extends StreamingWithStateTestBase {
 
     tEnv.registerTable("Orders", orders)
     tEnv.registerTable("RatesHistory", ratesHistory)
+    tEnv.registerTable("FilteredRatesHistory", tEnv.scan("RatesHistory").filter('rate > 110L))
     tEnv.registerFunction(
       "Rates",
-      ratesHistory.createTemporalTableFunction('rowtime, 'currency))
+      tEnv.scan("FilteredRatesHistory").createTemporalTableFunction('rowtime, 'currency))
+    tEnv.registerTable("TemporalJoinResult", tEnv.sqlQuery(sqlQuery))
 
-    val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+    // Scan from registered table to test for interplay between
+    // LogicalCorrelateToTemporalTableJoinRule and TableScanRule
+    val result = tEnv.scan("TemporalJoinResult").toAppendStream[Row]
     result.addSink(new StreamITCase.StringSink[Row])
     env.execute()
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services