You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 00:32:53 UTC

[flink] 02/02: [FLINK-13076] [table-planner-blink] Support true condition on lookup join condition

This is an automated email from the ASF dual-hosted git repository.

kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9df0a032b023180aa113b6f2e68ad6ef27e0c7d6
Author: godfreyhe <go...@163.com>
AuthorDate: Mon Jul 8 17:05:25 2019 +0800

    [FLINK-13076] [table-planner-blink] Support true condition on lookup join condition
    
    This closes #8962
---
 .../table/plan/rules/FlinkBatchRuleSets.scala      |  3 +-
 .../table/plan/rules/FlinkStreamRuleSets.scala     |  3 +-
 ...relateToJoinFromTemporalTableFunctionRule.scala |  6 +-
 ...gicalCorrelateToJoinFromTemporalTableRule.scala | 76 +++++++++++++++++-----
 .../table/plan/batch/sql/join/LookupJoinTest.xml   | 28 ++++++++
 .../table/plan/stream/sql/join/LookupJoinTest.xml  | 28 ++++++++
 .../table/plan/batch/sql/join/LookupJoinTest.scala | 12 ++++
 .../plan/stream/sql/join/LookupJoinTest.scala      | 13 ++++
 8 files changed, 149 insertions(+), 20 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index cce4016..7a1ba34 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -54,7 +54,8 @@ object FlinkBatchRuleSets {
     * can create new plan nodes.
     */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
-    LogicalCorrelateToJoinFromTemporalTableRule.INSTANCE,
+    LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
+    LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
     TableScanRule.INSTANCE)
 
   val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 35da87e..a95ef31 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -54,7 +54,8 @@ object FlinkStreamRuleSets {
     * can create new plan nodes.
     */
   val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
-    LogicalCorrelateToJoinFromTemporalTableRule.INSTANCE,
+    LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
+    LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
     LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE,
     TableScanRule.INSTANCE)
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index 6b8f075..c4870b1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -24,11 +24,12 @@ import org.apache.flink.table.expressions.{FieldReferenceExpression, _}
 import org.apache.flink.table.functions.utils.TableSqlFunction
 import org.apache.flink.table.functions.{TemporalTableFunction, TemporalTableFunctionImpl}
 import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.plan.util.{ExpandTableScanShuttle, RexDefaultVisitor}
 import org.apache.flink.table.plan.util.TemporalJoinUtil.{makeProcTimeTemporalJoinConditionCall, makeRowTimeTemporalJoinConditionCall}
+import org.apache.flink.table.plan.util.{ExpandTableScanShuttle, RexDefaultVisitor}
 import org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasRoot, isProctimeAttribute}
 import org.apache.flink.util.Preconditions.checkState
+
 import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
 import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
 import org.apache.calcite.rel.RelNode
@@ -45,8 +46,7 @@ import org.apache.calcite.rex._
 class LogicalCorrelateToJoinFromTemporalTableFunctionRule
   extends RelOptRule(
     operand(classOf[LogicalCorrelate],
-      some(
-        operand(classOf[RelNode], any()),
+      some(operand(classOf[RelNode], any()),
         operand(classOf[TableFunctionScan], none()))),
     "LogicalCorrelateToJoinFromTemporalTableFunctionRule") {
 
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
index 38f677e..15e3070 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.plan.rules.logical
 
 import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
 import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalSnapshot}
 import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, RexNode, RexShuttle}
@@ -29,26 +29,24 @@ import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, R
   * [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecLookupJoin]] in physical and
   * might be translated into
   * [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalJoin]] in the future.
-  *
-  * TODO supports `true` join condition
   */
-class LogicalCorrelateToJoinFromTemporalTableRule
-  extends RelOptRule(
-    operand(classOf[LogicalCorrelate],
-      operand(classOf[RelNode], any()),
-      operand(classOf[LogicalFilter],
-        operand(classOf[LogicalSnapshot], any()))),
-    "LogicalCorrelateToJoinFromTemporalTableRule") {
+abstract class LogicalCorrelateToJoinFromTemporalTableRule(
+    operand: RelOptRuleOperand,
+    description: String)
+  extends RelOptRule(operand, description) {
+
+  def getFilterCondition(call: RelOptRuleCall): RexNode
+
+  def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
 
   override def onMatch(call: RelOptRuleCall): Unit = {
     val correlate: LogicalCorrelate = call.rel(0)
     val leftInput: RelNode = call.rel(1)
-    val filter: LogicalFilter = call.rel(2)
-    val snapshot: LogicalSnapshot = call.rel[LogicalSnapshot](3)
+    val filterCondition = getFilterCondition(call)
+    val snapshot = getLogicalSnapshot(call)
 
     val leftRowType = leftInput.getRowType
-    val condition = filter.getCondition
-    val joinCondition = condition.accept(new RexShuttle() {
+    val joinCondition = filterCondition.accept(new RexShuttle() {
       // change correlate variable expression to normal RexInputRef (which is from left side)
       override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
         fieldAccess.getReferenceExpr match {
@@ -78,6 +76,54 @@ class LogicalCorrelateToJoinFromTemporalTableRule
 
 }
 
+/**
+  * Planner rule that matches temporal table join which join condition is not true,
+  * that means the right input of the Correlate is a Filter.
+  * e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+  * ON T.a = D.id
+  */
+class LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
+  extends LogicalCorrelateToJoinFromTemporalTableRule(
+    operand(classOf[LogicalCorrelate],
+      operand(classOf[RelNode], any()),
+      operand(classOf[LogicalFilter],
+        operand(classOf[LogicalSnapshot], any()))),
+    "LogicalCorrelateToJoinFromTemporalTableRuleWithFilter"
+  ) {
+
+  override def getFilterCondition(call: RelOptRuleCall): RexNode = {
+    val filter: LogicalFilter = call.rel(2)
+    filter.getCondition
+  }
+
+  override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
+    call.rels(3).asInstanceOf[LogicalSnapshot]
+  }
+}
+
+/**
+  * Planner rule that matches temporal table join which join condition is true,
+  * that means the right input of the Correlate is a Snapshot.
+  * e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D ON true
+  */
+class LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter
+  extends LogicalCorrelateToJoinFromTemporalTableRule(
+    operand(classOf[LogicalCorrelate],
+      operand(classOf[RelNode], any()),
+      operand(classOf[LogicalSnapshot], any())),
+    "LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter"
+  ) {
+
+  override def getFilterCondition(call: RelOptRuleCall): RexNode = {
+    call.builder().literal(true)
+  }
+
+  override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
+    call.rels(2).asInstanceOf[LogicalSnapshot]
+  }
+}
+
 object LogicalCorrelateToJoinFromTemporalTableRule {
-  val INSTANCE = new LogicalCorrelateToJoinFromTemporalTableRule
+  val WITH_FILTER = new LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
+  val WITHOUT_FILTER = new LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index 0377621..8eb5180 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -236,6 +236,34 @@ FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinTemporalTableWithTrueCondition">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable AS T
+JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ON true
+WHERE T.c > 1000
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
++- LogicalFilter(condition=[>($2, 1000)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+      :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+      :  +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[], select=[a, b, c, proctime, id, name, age])
++- Calc(select=[a, b, c, PROCTIME() AS proctime], where=[>(c, 1000)])
+   +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], fields=[a, b, c])
+]]>
+    </Resource>
+  </TestCase>
   <TestCase name="testReusing">
     <Resource name="sql">
       <![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
index 8e56ae1..358f601 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
@@ -242,4 +242,32 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinTemporalTableWithTrueCondition">
+    <Resource name="sql">
+      <![CDATA[
+SELECT * FROM MyTable AS T
+JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ON true
+WHERE T.c > 1000
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalFilter(condition=[>($2, 1000)])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalSnapshot(period=[$cor0.proctime])
+         +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[], select=[a, b, c, proctime, rowtime, id, name, age])
+   +- Calc(select=[a, b, c, proctime, rowtime], where=[>(c, 1000)])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index b1083d0..d9067a43 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -215,6 +215,18 @@ class LookupJoinTest extends TableTestBase {
   }
 
   @Test
+  def testJoinTemporalTableWithTrueCondition(): Unit = {
+    val sql =
+      """
+        |SELECT * FROM MyTable AS T
+        |JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+        |ON true
+        |WHERE T.c > 1000
+      """.stripMargin
+    testUtil.verifyPlan(sql)
+  }
+
+  @Test
   def testReusing(): Unit = {
     testUtil.tableEnv.getConfig.getConf.setBoolean(
       OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index 1294ff4..d01616a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -317,6 +317,19 @@ class LookupJoinTest extends TableTestBase with Serializable {
     streamUtil.verifyPlan(sql)
   }
 
+  @Test
+  def testJoinTemporalTableWithTrueCondition(): Unit = {
+    val sql =
+      """
+        |SELECT * FROM MyTable AS T
+        |JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+        |ON true
+        |WHERE T.c > 1000
+      """.stripMargin
+
+    streamUtil.verifyPlan(sql)
+  }
+
   // ==========================================================================================
 
   private def expectExceptionThrown(