You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 02:25:07 UTC
[flink] 02/02: [FLINK-13509][table-planner-blink] Forbidden `IS NOT
DISTINCT FROM `(or an expanded version) in LookupJoin.
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 78d1e68ab120e2fecb931e6d9ea1ee8bef247cde
Author: beyond1920 <be...@126.com>
AuthorDate: Fri Aug 2 10:13:45 2019 +0800
[FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.
---
.../physical/common/CommonLookupJoinRule.scala | 24 ++++++++++++++++++++++
.../plan/batch/sql/join/LookupJoinTest.scala | 22 ++++++++++++++++++++
.../plan/stream/sql/join/LookupJoinTest.scala | 22 ++++++++++++++++++++
3 files changed, 68 insertions(+)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
index 08510b2..62bd3dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin
import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.planner.plan.utils.JoinUtil
import org.apache.flink.table.sources.{LookupableTableSource, TableSource}
import org.apache.calcite.plan.RelOptRule.{any, operand}
@@ -30,6 +31,10 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexProgram}
+import java.util
+
+import scala.collection.JavaConversions._
+
/**
* Base implementation for both
* [[org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecLookupJoinRule]] and
@@ -86,6 +91,23 @@ trait CommonLookupJoinRule {
}
}
+ // TODO Support `IS NOT DISTINCT FROM` in the future: FLINK-13509
+ protected def validateJoin(join: FlinkLogicalJoin): Unit = {
+
+ val filterNulls: Array[Boolean] = {
+ val filterNulls = new util.ArrayList[java.lang.Boolean]
+ JoinUtil.createJoinInfo(join.getLeft, join.getRight, join.getCondition, filterNulls)
+ filterNulls.map(_.booleanValue()).toArray
+ }
+
+ if (filterNulls.contains(false)) {
+ throw new TableException(
+ s"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ s"alternative '(a = b) or (a IS NULL AND b IS NULL)'), the join condition is " +
+ s"'${join.getCondition}'")
+ }
+ }
+
protected def transform(
join: FlinkLogicalJoin,
input: FlinkLogicalRel,
@@ -115,6 +137,7 @@ abstract class BaseSnapshotOnTableScanRule(description: String)
val tableScan = call.rel[RelNode](3)
val tableSource = findTableSource(tableScan).orNull
+ validateJoin(join)
val temporalJoin = transform(join, input, tableSource, None)
call.transformTo(temporalJoin)
}
@@ -145,6 +168,7 @@ abstract class BaseSnapshotOnCalcTableScanRule(description: String)
val tableScan = call.rel[RelNode](4)
val tableSource = findTableSource(tableScan).orNull
+ validateJoin(join)
val temporalJoin = transform(
join, input, tableSource, Some(calc.getProgram))
call.transformTo(temporalJoin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
index 64e1a8b..ae0414dd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
@@ -86,6 +86,28 @@ class LookupJoinTest extends TableTestBase {
}
@Test
+ def testNotDistinctFromInJoinCondition(): Unit = {
+
+ // does not support join condition contains `IS NOT DISTINCT`
+ expectExceptionThrown(
+ "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT DISTINCT FROM D.id",
+ "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+ classOf[TableException]
+ )
+
+ // does not support join condition contains `IS NOT DISTINCT` and similar syntax
+ expectExceptionThrown(
+ "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
+ "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+ classOf[TableException]
+ )
+ }
+
+ @Test
def testLogicalPlan(): Unit = {
val sql1 =
"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 2b21265..5bf012b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -101,6 +101,28 @@ class LookupJoinTest extends TableTestBase with Serializable {
}
@Test
+ def testNotDistinctFromInJoinCondition(): Unit = {
+
+ // does not support join condition contains `IS NOT DISTINCT`
+ expectExceptionThrown(
+ "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT DISTINCT FROM D.id",
+ "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+ classOf[TableException]
+ )
+
+ // does not support join condition contains `IS NOT DISTINCT` and similar syntax
+ expectExceptionThrown(
+ "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
+ "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+ classOf[TableException]
+ )
+ }
+
+ @Test
def testInvalidLookupTableFunction(): Unit = {
streamUtil.addDataStream[(Int, String, Long, Timestamp)](
"T", 'a, 'b, 'c, 'ts, 'proctime.proctime)