You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 02:25:07 UTC

[flink] 02/02: [FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 78d1e68ab120e2fecb931e6d9ea1ee8bef247cde
Author: beyond1920 <be...@126.com>
AuthorDate: Fri Aug 2 10:13:45 2019 +0800

    [FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.
---
 .../physical/common/CommonLookupJoinRule.scala     | 24 ++++++++++++++++++++++
 .../plan/batch/sql/join/LookupJoinTest.scala       | 22 ++++++++++++++++++++
 .../plan/stream/sql/join/LookupJoinTest.scala      | 22 ++++++++++++++++++++
 3 files changed, 68 insertions(+)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
index 08510b2..62bd3dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin
 import org.apache.flink.table.planner.plan.nodes.logical._
 import org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan
 import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.planner.plan.utils.JoinUtil
 import org.apache.flink.table.sources.{LookupableTableSource, TableSource}
 
 import org.apache.calcite.plan.RelOptRule.{any, operand}
@@ -30,6 +31,10 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexProgram}
 
+import java.util
+
+import scala.collection.JavaConversions._
+
 /**
   * Base implementation for both
   * [[org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecLookupJoinRule]] and
@@ -86,6 +91,23 @@ trait CommonLookupJoinRule {
     }
   }
 
+  // TODO Support `IS NOT DISTINCT FROM` in the future: FLINK-13509
+  protected def validateJoin(join: FlinkLogicalJoin): Unit = {
+
+    val filterNulls: Array[Boolean] = {
+      val filterNulls = new util.ArrayList[java.lang.Boolean]
+      JoinUtil.createJoinInfo(join.getLeft, join.getRight, join.getCondition, filterNulls)
+      filterNulls.map(_.booleanValue()).toArray
+    }
+
+    if (filterNulls.contains(false)) {
+      throw new TableException(
+        s"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+          s"alternative '(a = b) or (a IS NULL AND b IS NULL)'), the join condition is " +
+          s"'${join.getCondition}'")
+    }
+  }
+
   protected def transform(
     join: FlinkLogicalJoin,
     input: FlinkLogicalRel,
@@ -115,6 +137,7 @@ abstract class BaseSnapshotOnTableScanRule(description: String)
     val tableScan = call.rel[RelNode](3)
     val tableSource = findTableSource(tableScan).orNull
 
+    validateJoin(join)
     val temporalJoin = transform(join, input, tableSource, None)
     call.transformTo(temporalJoin)
   }
@@ -145,6 +168,7 @@ abstract class BaseSnapshotOnCalcTableScanRule(description: String)
     val tableScan = call.rel[RelNode](4)
     val tableSource = findTableSource(tableScan).orNull
 
+    validateJoin(join)
     val temporalJoin = transform(
       join, input, tableSource, Some(calc.getProgram))
     call.transformTo(temporalJoin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
index 64e1a8b..ae0414dd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
@@ -86,6 +86,28 @@ class LookupJoinTest extends TableTestBase {
   }
 
   @Test
+  def testNotDistinctFromInJoinCondition(): Unit = {
+
+    // does not support join condition contains `IS NOT DISTINCT`
+    expectExceptionThrown(
+      "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT  DISTINCT FROM D.id",
+      "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+        "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+      classOf[TableException]
+    )
+
+    // does not support join condition contains `IS NOT  DISTINCT` and similar syntax
+    expectExceptionThrown(
+      "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
+      "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+        "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+      classOf[TableException]
+    )
+  }
+
+  @Test
   def testLogicalPlan(): Unit = {
     val sql1 =
       """
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 2b21265..5bf012b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -101,6 +101,28 @@ class LookupJoinTest extends TableTestBase with Serializable {
   }
 
   @Test
+  def testNotDistinctFromInJoinCondition(): Unit = {
+
+    // does not support join condition contains `IS NOT DISTINCT`
+    expectExceptionThrown(
+      "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT  DISTINCT FROM D.id",
+      "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+        "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+      classOf[TableException]
+    )
+
+    // does not support join condition contains `IS NOT  DISTINCT` and similar syntax
+    expectExceptionThrown(
+      "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
+      "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+        "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+      classOf[TableException]
+    )
+  }
+
+  @Test
   def testInvalidLookupTableFunction(): Unit = {
     streamUtil.addDataStream[(Int, String, Long, Timestamp)](
       "T", 'a, 'b, 'c, 'ts, 'proctime.proctime)