You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ku...@apache.org on 2019/07/11 00:32:53 UTC
[flink] 02/02: [FLINK-13076] [table-planner-blink] Support true
condition on lookup join condition
This is an automated email from the ASF dual-hosted git repository.
kurt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9df0a032b023180aa113b6f2e68ad6ef27e0c7d6
Author: godfreyhe <go...@163.com>
AuthorDate: Mon Jul 8 17:05:25 2019 +0800
[FLINK-13076] [table-planner-blink] Support true condition on lookup join condition
This closes #8962
---
.../table/plan/rules/FlinkBatchRuleSets.scala | 3 +-
.../table/plan/rules/FlinkStreamRuleSets.scala | 3 +-
...relateToJoinFromTemporalTableFunctionRule.scala | 6 +-
...gicalCorrelateToJoinFromTemporalTableRule.scala | 76 +++++++++++++++++-----
.../table/plan/batch/sql/join/LookupJoinTest.xml | 28 ++++++++
.../table/plan/stream/sql/join/LookupJoinTest.xml | 28 ++++++++
.../table/plan/batch/sql/join/LookupJoinTest.scala | 12 ++++
.../plan/stream/sql/join/LookupJoinTest.scala | 13 ++++
8 files changed, 149 insertions(+), 20 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
index cce4016..7a1ba34 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkBatchRuleSets.scala
@@ -54,7 +54,8 @@ object FlinkBatchRuleSets {
* can create new plan nodes.
*/
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
- LogicalCorrelateToJoinFromTemporalTableRule.INSTANCE,
+ LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
+ LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
TableScanRule.INSTANCE)
val POST_EXPAND_CLEAN_UP_RULES: RuleSet = RuleSets.ofList(
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
index 35da87e..a95ef31 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/FlinkStreamRuleSets.scala
@@ -54,7 +54,8 @@ object FlinkStreamRuleSets {
* can create new plan nodes.
*/
val EXPAND_PLAN_RULES: RuleSet = RuleSets.ofList(
- LogicalCorrelateToJoinFromTemporalTableRule.INSTANCE,
+ LogicalCorrelateToJoinFromTemporalTableRule.WITH_FILTER,
+ LogicalCorrelateToJoinFromTemporalTableRule.WITHOUT_FILTER,
LogicalCorrelateToJoinFromTemporalTableFunctionRule.INSTANCE,
TableScanRule.INSTANCE)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
index 6b8f075..c4870b1 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableFunctionRule.scala
@@ -24,11 +24,12 @@ import org.apache.flink.table.expressions.{FieldReferenceExpression, _}
import org.apache.flink.table.functions.utils.TableSqlFunction
import org.apache.flink.table.functions.{TemporalTableFunction, TemporalTableFunctionImpl}
import org.apache.flink.table.operations.QueryOperation
-import org.apache.flink.table.plan.util.{ExpandTableScanShuttle, RexDefaultVisitor}
import org.apache.flink.table.plan.util.TemporalJoinUtil.{makeProcTimeTemporalJoinConditionCall, makeRowTimeTemporalJoinConditionCall}
+import org.apache.flink.table.plan.util.{ExpandTableScanShuttle, RexDefaultVisitor}
import org.apache.flink.table.types.logical.LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE
import org.apache.flink.table.types.logical.utils.LogicalTypeChecks.{hasRoot, isProctimeAttribute}
import org.apache.flink.util.Preconditions.checkState
+
import org.apache.calcite.plan.RelOptRule.{any, none, operand, some}
import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
import org.apache.calcite.rel.RelNode
@@ -45,8 +46,7 @@ import org.apache.calcite.rex._
class LogicalCorrelateToJoinFromTemporalTableFunctionRule
extends RelOptRule(
operand(classOf[LogicalCorrelate],
- some(
- operand(classOf[RelNode], any()),
+ some(operand(classOf[RelNode], any()),
operand(classOf[TableFunctionScan], none()))),
"LogicalCorrelateToJoinFromTemporalTableFunctionRule") {
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
index 38f677e..15e3070 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/plan/rules/logical/LogicalCorrelateToJoinFromTemporalTableRule.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.plan.rules.logical
import org.apache.calcite.plan.RelOptRule.{any, operand}
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.logical.{LogicalCorrelate, LogicalFilter, LogicalSnapshot}
import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, RexNode, RexShuttle}
@@ -29,26 +29,24 @@ import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexInputRef, R
* [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecLookupJoin]] in physical and
* might be translated into
* [[org.apache.flink.table.plan.nodes.physical.stream.StreamExecTemporalJoin]] in the future.
- *
- * TODO supports `true` join condition
*/
-class LogicalCorrelateToJoinFromTemporalTableRule
- extends RelOptRule(
- operand(classOf[LogicalCorrelate],
- operand(classOf[RelNode], any()),
- operand(classOf[LogicalFilter],
- operand(classOf[LogicalSnapshot], any()))),
- "LogicalCorrelateToJoinFromTemporalTableRule") {
+abstract class LogicalCorrelateToJoinFromTemporalTableRule(
+ operand: RelOptRuleOperand,
+ description: String)
+ extends RelOptRule(operand, description) {
+
+ def getFilterCondition(call: RelOptRuleCall): RexNode
+
+ def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot
override def onMatch(call: RelOptRuleCall): Unit = {
val correlate: LogicalCorrelate = call.rel(0)
val leftInput: RelNode = call.rel(1)
- val filter: LogicalFilter = call.rel(2)
- val snapshot: LogicalSnapshot = call.rel[LogicalSnapshot](3)
+ val filterCondition = getFilterCondition(call)
+ val snapshot = getLogicalSnapshot(call)
val leftRowType = leftInput.getRowType
- val condition = filter.getCondition
- val joinCondition = condition.accept(new RexShuttle() {
+ val joinCondition = filterCondition.accept(new RexShuttle() {
// change correlate variable expression to normal RexInputRef (which is from left side)
override def visitFieldAccess(fieldAccess: RexFieldAccess): RexNode = {
fieldAccess.getReferenceExpr match {
@@ -78,6 +76,54 @@ class LogicalCorrelateToJoinFromTemporalTableRule
}
+/**
+ * Planner rule that matches temporal table join which join condition is not true,
+ * that means the right input of the Correlate is a Filter.
+ * e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ * ON T.a = D.id
+ */
+class LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
+ extends LogicalCorrelateToJoinFromTemporalTableRule(
+ operand(classOf[LogicalCorrelate],
+ operand(classOf[RelNode], any()),
+ operand(classOf[LogicalFilter],
+ operand(classOf[LogicalSnapshot], any()))),
+ "LogicalCorrelateToJoinFromTemporalTableRuleWithFilter"
+ ) {
+
+ override def getFilterCondition(call: RelOptRuleCall): RexNode = {
+ val filter: LogicalFilter = call.rel(2)
+ filter.getCondition
+ }
+
+ override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
+ call.rels(3).asInstanceOf[LogicalSnapshot]
+ }
+}
+
+/**
+ * Planner rule that matches temporal table join which join condition is true,
+ * that means the right input of the Correlate is a Snapshot.
+ * e.g. SELECT * FROM MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D ON true
+ */
+class LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter
+ extends LogicalCorrelateToJoinFromTemporalTableRule(
+ operand(classOf[LogicalCorrelate],
+ operand(classOf[RelNode], any()),
+ operand(classOf[LogicalSnapshot], any())),
+ "LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter"
+ ) {
+
+ override def getFilterCondition(call: RelOptRuleCall): RexNode = {
+ call.builder().literal(true)
+ }
+
+ override def getLogicalSnapshot(call: RelOptRuleCall): LogicalSnapshot = {
+ call.rels(2).asInstanceOf[LogicalSnapshot]
+ }
+}
+
object LogicalCorrelateToJoinFromTemporalTableRule {
- val INSTANCE = new LogicalCorrelateToJoinFromTemporalTableRule
+ val WITH_FILTER = new LogicalCorrelateToJoinFromTemporalTableRuleWithFilter
+ val WITHOUT_FILTER = new LogicalCorrelateToJoinFromTemporalTableRuleWithoutFilter
}
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
index 0377621..8eb5180 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.xml
@@ -236,6 +236,34 @@ FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=
]]>
</Resource>
</TestCase>
+ <TestCase name="testJoinTemporalTableWithTrueCondition">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM MyTable AS T
+JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ON true
+WHERE T.c > 1000
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], id=[$4], name=[$5], age=[$6])
++- LogicalFilter(condition=[>($2, 1000)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+ :- LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[PROCTIME()])
+ : +- LogicalTableScan(table=[[default_catalog, default_database, T0]])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[], select=[a, b, c, proctime, id, name, age])
++- Calc(select=[a, b, c, PROCTIME() AS proctime], where=[>(c, 1000)])
+ +- BoundedStreamScan(table=[[default_catalog, default_database, T0]], fields=[a, b, c])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testReusing">
<Resource name="sql">
<![CDATA[
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
index 8e56ae1..358f601 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.xml
@@ -242,4 +242,32 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id])
]]>
</Resource>
</TestCase>
+ <TestCase name="testJoinTemporalTableWithTrueCondition">
+ <Resource name="sql">
+ <![CDATA[
+SELECT * FROM MyTable AS T
+JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ON true
+WHERE T.c > 1000
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], proctime=[$3], rowtime=[$4], id=[$5], name=[$6], age=[$7])
++- LogicalFilter(condition=[>($2, 1000)])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, name, age])
++- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], on=[], select=[a, b, c, proctime, rowtime, id, name, age])
+ +- Calc(select=[a, b, c, proctime, rowtime], where=[>(c, 1000)])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
index b1083d0..d9067a43 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/batch/sql/join/LookupJoinTest.scala
@@ -215,6 +215,18 @@ class LookupJoinTest extends TableTestBase {
}
@Test
+ def testJoinTemporalTableWithTrueCondition(): Unit = {
+ val sql =
+ """
+ |SELECT * FROM MyTable AS T
+ |JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ |ON true
+ |WHERE T.c > 1000
+ """.stripMargin
+ testUtil.verifyPlan(sql)
+ }
+
+ @Test
def testReusing(): Unit = {
testUtil.tableEnv.getConfig.getConf.setBoolean(
OptimizerConfigOptions.SQL_OPTIMIZER_REUSE_SUB_PLAN_ENABLED, true)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
index 1294ff4..d01616a 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/plan/stream/sql/join/LookupJoinTest.scala
@@ -317,6 +317,19 @@ class LookupJoinTest extends TableTestBase with Serializable {
streamUtil.verifyPlan(sql)
}
+ @Test
+ def testJoinTemporalTableWithTrueCondition(): Unit = {
+ val sql =
+ """
+ |SELECT * FROM MyTable AS T
+ |JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D
+ |ON true
+ |WHERE T.c > 1000
+ """.stripMargin
+
+ streamUtil.verifyPlan(sql)
+ }
+
// ==========================================================================================
private def expectExceptionThrown(