You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 02:25:05 UTC

[flink] branch master updated (5e5f5be -> 78d1e68)

This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from 5e5f5be  [FLINK-13044][s3][fs] Fix handling of relocated amazon classes
     new 4b48eb8  [FLINK-13433][table-planner-blink] Do not fetch data from LookupableTableSource if the JoinKey in left side of LookupJoin contains null value
     new 78d1e68  [FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../planner/codegen/LookupJoinCodeGenerator.scala  |  32 +++-
 .../plan/nodes/common/CommonLookupJoin.scala       |   2 +-
 .../physical/common/CommonLookupJoinRule.scala     |  24 +++
 .../plan/batch/sql/join/LookupJoinTest.scala       |  22 +++
 .../plan/stream/sql/join/LookupJoinTest.scala      |  22 +++
 .../runtime/batch/sql/join/LookupJoinITCase.scala  | 173 ++++++++++++---------
 .../runtime/stream/sql/LookupJoinITCase.scala      | 119 ++++++++++++--
 .../utils/InMemoryLookupableTableSource.scala      |   7 +-
 8 files changed, 308 insertions(+), 93 deletions(-)


[flink] 01/02: [FLINK-13433][table-planner-blink] Do not fetch data from LookupableTableSource if the JoinKey in left side of LookupJoin contains null value

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4b48eb89283f1d61514a4a52263d8291b724da90
Author: beyond1920 <be...@126.com>
AuthorDate: Wed Jul 31 13:08:17 2019 +0800

    [FLINK-13433][table-planner-blink] Do not fetch data from LookupableTableSource if the JoinKey in left side of LookupJoin contains null value
    
    This closes #9285
---
 .../planner/codegen/LookupJoinCodeGenerator.scala  |  32 +++-
 .../plan/nodes/common/CommonLookupJoin.scala       |   2 +-
 .../runtime/batch/sql/join/LookupJoinITCase.scala  | 173 ++++++++++++---------
 .../runtime/stream/sql/LookupJoinITCase.scala      | 119 ++++++++++++--
 .../utils/InMemoryLookupableTableSource.scala      |   7 +-
 5 files changed, 240 insertions(+), 93 deletions(-)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 044d8dd..9a9bf2e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -65,7 +65,7 @@ object LookupJoinCodeGenerator {
     : GeneratedFunction[FlatMapFunction[BaseRow, BaseRow]] = {
 
     val ctx = CodeGeneratorContext(config)
-    val (prepareCode, parameters) = prepareParameters(
+    val (prepareCode, parameters, nullInParameters) = prepareParameters(
       ctx,
       typeFactory,
       inputType,
@@ -87,11 +87,17 @@ object LookupJoinCodeGenerator {
         s"$lookupFunctionTerm.setCollector($DEFAULT_COLLECTOR_TERM);"
     }
 
+    // TODO: filter all records when there is any nulls on the join key, because
+    //  "IS NOT DISTINCT FROM" is not supported yet.
     val body =
       s"""
          |$prepareCode
          |$setCollectorCode
-         |$lookupFunctionTerm.eval($parameters);
+         |if ($nullInParameters) {
+         |  return;
+         |} else {
+         |  $lookupFunctionTerm.eval($parameters);
+         | }
       """.stripMargin
 
     FunctionCodeGenerator.generateFunction(
@@ -118,7 +124,7 @@ object LookupJoinCodeGenerator {
     : GeneratedFunction[AsyncFunction[BaseRow, AnyRef]] = {
 
     val ctx = CodeGeneratorContext(config)
-    val (prepareCode, parameters) = prepareParameters(
+    val (prepareCode, parameters, nullInParameters) = prepareParameters(
       ctx,
       typeFactory,
       inputType,
@@ -130,11 +136,18 @@ object LookupJoinCodeGenerator {
     val lookupFunctionTerm = ctx.addReusableFunction(asyncLookupFunction)
     val DELEGATE = className[DelegatingResultFuture[_]]
 
+    // TODO: filter all records when there is any nulls on the join key, because
+    //  "IS NOT DISTINCT FROM" is not supported yet.
     val body =
       s"""
          |$prepareCode
-         |$DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
-         |$lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters);
+         |if ($nullInParameters) {
+         |  $DEFAULT_COLLECTOR_TERM.complete(java.util.Collections.emptyList());
+         |  return;
+         |} else {
+         |  $DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
+         |  $lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters);
+         |}
       """.stripMargin
 
     FunctionCodeGenerator.generateFunction(
@@ -156,7 +169,7 @@ object LookupJoinCodeGenerator {
       lookupKeyInOrder: Array[Int],
       allLookupFields: Map[Int, LookupKey],
       isExternalArgs: Boolean,
-      fieldCopy: Boolean): (String, String) = {
+      fieldCopy: Boolean): (String, String, String) = {
 
     val inputFieldExprs = for (i <- lookupKeyInOrder) yield {
       allLookupFields.get(i) match {
@@ -195,9 +208,12 @@ object LookupJoinCodeGenerator {
              |  $newTerm = $assign;
              |}
              """.stripMargin
-        (code, newTerm)
+        (code, newTerm, e.nullTerm)
       }
-    (codeAndArg.map(_._1).mkString("\n"), codeAndArg.map(_._2).mkString(", "))
+    (
+      codeAndArg.map(_._1).mkString("\n"),
+      codeAndArg.map(_._2).mkString(", "),
+      codeAndArg.map(_._3).mkString("|| "))
   }
 
   /**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
index 7a2b133..88800da 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
@@ -560,7 +560,7 @@ abstract class CommonLookupJoin(
       joinType: JoinRelType): Unit = {
 
     // check join on all fields of PRIMARY KEY or (UNIQUE) INDEX
-    if (allLookupKeys.isEmpty || allLookupKeys.isEmpty) {
+    if (allLookupKeys.isEmpty) {
       throw new TableException(
         "Temporal table join requires an equality condition on fields of " +
           s"table [${tableSource.explainSource()}].")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
index fe141f9..6fac55e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -22,9 +22,17 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
 import org.apache.flink.table.api.Types
 import org.apache.flink.table.planner.runtime.utils.{BatchTableEnvUtil, BatchTestBase, InMemoryLookupableTableSource}
 
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
 import org.junit.{Before, Test}
 
-class LookupJoinITCase extends BatchTestBase {
+import java.lang.Boolean
+import java.util
+
+import scala.collection.JavaConversions._
+
+@RunWith(classOf[Parameterized])
+class LookupJoinITCase(isAsyncMode: Boolean) extends BatchTestBase {
 
   val data = List(
     BatchTestBase.row(1L, 12L, "Julian"),
@@ -33,6 +41,12 @@ class LookupJoinITCase extends BatchTestBase {
     BatchTestBase.row(8L, 11L, "Hello world"),
     BatchTestBase.row(9L, 12L, "Hello world!"))
 
+  val dataWithNull = List(
+    BatchTestBase.row(null, 15L, "Hello"),
+    BatchTestBase.row(3L, 15L, "Fabian"),
+    BatchTestBase.row(null, 11L, "Hello world"),
+    BatchTestBase.row(9L, 12L, "Hello world!"))
+
   val typeInfo = new RowTypeInfo(LONG_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)
 
   val userData = List(
@@ -55,19 +69,72 @@ class LookupJoinITCase extends BatchTestBase {
     .enableAsync()
     .build()
 
+  val userDataWithNull = List(
+    (11, 1L, "Julian"),
+    (22, null, "Hello"),
+    (33, 3L, "Fabian"),
+    (44, null, "Hello world"))
+
+  val userWithNullDataTableSource = InMemoryLookupableTableSource.builder()
+    .data(userDataWithNull)
+    .field("age", Types.INT)
+    .field("id", Types.LONG)
+    .field("name", Types.STRING)
+    .build()
+
+  val userAsyncWithNullDataTableSource = InMemoryLookupableTableSource.builder()
+    .data(userDataWithNull)
+    .field("age", Types.INT)
+    .field("id", Types.LONG)
+    .field("name", Types.STRING)
+    .enableAsync()
+    .build()
+
+  var userTable: String = _
+  var userTableWithNull: String = _
+
   @Before
   override def before() {
     super.before()
     BatchTableEnvUtil.registerCollection(tEnv, "T0", data, typeInfo, "id, len, content")
     val myTable = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime  FROM T0")
     tEnv.registerTable("T", myTable)
+
+    BatchTableEnvUtil.registerCollection(
+      tEnv, "T1", dataWithNull, typeInfo, "id, len, content")
+    val myTable1 = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime  FROM T1")
+    tEnv.registerTable("nullableT", myTable1)
+
     tEnv.registerTableSource("userTable", userTableSource)
     tEnv.registerTableSource("userAsyncTable", userAsyncTableSource)
+    userTable = if (isAsyncMode) "userAsyncTable" else "userTable"
+
+    tEnv.registerTableSource("userWithNullDataTable", userWithNullDataTableSource)
+    tEnv.registerTableSource("userWithNullDataAsyncTable", userAsyncWithNullDataTableSource)
+    userTableWithNull = if (isAsyncMode) "userWithNullDataAsyncTable" else "userWithNullDataTable"
+
+    // TODO: enable object reuse until [FLINK-12351] is fixed.
+    env.getConfig.disableObjectReuse()
+  }
+
+  @Test
+  def testLeftJoinTemporalTableWithLocalPredicate(): Unit = {
+    val sql = s"SELECT T.id, T.len, T.content, D.name, D.age FROM T LEFT JOIN $userTable " +
+      "for system_time as of T.proctime AS D ON T.id = D.id " +
+      "AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' " +
+      "WHERE T.id > 1"
+
+    val expected = Seq(
+      BatchTestBase.row(2, 15, "Hello", null, null),
+      BatchTestBase.row(3, 15, "Fabian", "Fabian", 33),
+      BatchTestBase.row(8, 11, "Hello world", null, null),
+      BatchTestBase.row(9, 12, "Hello world!", null, null))
+    checkResult(sql, expected, false)
   }
 
   @Test
   def testJoinTemporalTable(): Unit = {
-    val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, T.content, D.name FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON T.id = D.id"
 
     val expected = Seq(
@@ -79,7 +146,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableWithPushDown(): Unit = {
-    val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, T.content, D.name FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20"
 
     val expected = Seq(
@@ -90,7 +157,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableWithNonEqualFilter(): Unit = {
-    val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= D.age"
 
     val expected = Seq(
@@ -101,7 +168,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableOnMultiFields(): Unit = {
-    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON T.id = D.id AND T.content = D.name"
 
     val expected = Seq(
@@ -112,7 +179,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableOnMultiFieldsWithUdf(): Unit = {
-    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON mod(T.id, 4) = D.id AND T.content = D.name"
 
     val expected = Seq(
@@ -123,7 +190,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testJoinTemporalTableOnMultiKeyFields(): Unit = {
-    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
       "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
 
     val expected = Seq(
@@ -134,7 +201,7 @@ class LookupJoinITCase extends BatchTestBase {
 
   @Test
   def testLeftJoinTemporalTable(): Unit = {
-    val sql = "SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN userTable " +
+    val sql = s"SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN $userTable " +
       "for system_time as of T.proctime AS D ON T.id = D.id"
 
     val expected = Seq(
@@ -147,88 +214,50 @@ class LookupJoinITCase extends BatchTestBase {
   }
 
   @Test
-  def testAsyncJoinTemporalTable(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id"
-
-    val expected = Seq(
-      BatchTestBase.row(1, 12, "Julian", "Julian"),
-      BatchTestBase.row(2, 15, "Hello", "Jark"),
-      BatchTestBase.row(3, 15, "Fabian", "Fabian"))
-    checkResult(sql, expected, false)
-  }
-
-  @Test
-  def testAsyncJoinTemporalTableWithPushDown(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20"
+  def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+    val sql = s"SELECT T.id, T.len, D.name FROM nullableT T JOIN $userTableWithNull " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
 
     val expected = Seq(
-      BatchTestBase.row(2, 15, "Hello", "Jark"),
-      BatchTestBase.row(3, 15, "Fabian", "Fabian"))
+      BatchTestBase.row(3,15,"Fabian"))
     checkResult(sql, expected, false)
   }
 
   @Test
-  def testAsyncJoinTemporalTableWithNonEqualFilter(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= D.age"
-
+  def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+    val sql = s"SELECT D.id, T.len, D.name FROM nullableT T LEFT JOIN $userTableWithNull " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
     val expected = Seq(
-      BatchTestBase.row(2, 15, "Hello", "Jark", 22),
-      BatchTestBase.row(3, 15, "Fabian", "Fabian", 33))
+      BatchTestBase.row(null,15,null),
+      BatchTestBase.row(3,15,"Fabian"),
+      BatchTestBase.row(null,11,null),
+      BatchTestBase.row(null,12,null))
     checkResult(sql, expected, false)
   }
 
   @Test
-  def testAsyncLeftJoinTemporalTableWithLocalPredicate(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T LEFT JOIN userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id " +
-      "AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' " +
-      "WHERE T.id > 1"
-
-    val expected = Seq(
-      BatchTestBase.row(2, 15, "Hello", null, null),
-      BatchTestBase.row(3, 15, "Fabian", "Fabian", 33),
-      BatchTestBase.row(8, 11, "Hello world", null, null),
-      BatchTestBase.row(9, 12, "Hello world!", null, null))
+  def testJoinTemporalTableOnNullConstantKey(): Unit = {
+    val sql = s"SELECT T.id, T.len, T.content FROM T JOIN $userTable " +
+      "for system_time as of T.proctime AS D ON D.id = null"
+    val expected = Seq()
     checkResult(sql, expected, false)
   }
 
   @Test
-  def testAsyncJoinTemporalTableOnMultiFields(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id AND T.content = D.name"
-
-    val expected = Seq(
-      BatchTestBase.row(1, 12, "Julian"),
-      BatchTestBase.row(3, 15, "Fabian"))
+  def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
+    val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id"
+    val expected = Seq()
     checkResult(sql, expected, false)
   }
+}
 
-  @Test
-  def testAsyncLeftJoinTemporalTable(): Unit = {
-    // TODO: enable object reuse until [FLINK-12351] is fixed.
-    env.getConfig.disableObjectReuse()
-    val sql = "SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN userAsyncTable " +
-      "for system_time as of T.proctime AS D ON T.id = D.id"
+object LookupJoinITCase {
 
-    val expected = Seq(
-      BatchTestBase.row(1, 12, "Julian", 11),
-      BatchTestBase.row(2, 15, "Jark", 22),
-      BatchTestBase.row(3, 15, "Fabian", 33),
-      BatchTestBase.row(8, 11, null, null),
-      BatchTestBase.row(9, 12, null, null))
-    checkResult(sql, expected, false)
+  @Parameterized.Parameters(name = "isAsyncMode = {0}")
+  def parameters(): util.Collection[Array[java.lang.Object]] = {
+    Seq[Array[AnyRef]](
+      Array(Boolean.TRUE), Array(Boolean.FALSE)
+    )
   }
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index c0dbdc7..f145e2b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils
 import org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, StreamingTestBase, TestingAppendSink}
 import org.apache.flink.types.Row
 
-import org.junit.Assert.assertEquals
+import org.junit.Assert.{assertEquals, assertTrue}
 import org.junit.Test
 
 import java.lang.{Integer => JInt, Long => JLong}
@@ -46,6 +46,11 @@ class LookupJoinITCase extends StreamingTestBase {
     Row.of(null, new JInt(11), "Hello world"),
     Row.of(new JLong(9), new JInt(12), "Hello world!"))
 
+  val dataRowType:TypeInformation[Row] = new RowTypeInfo(
+    BasicTypeInfo.LONG_TYPE_INFO,
+    BasicTypeInfo.INT_TYPE_INFO,
+    BasicTypeInfo.STRING_TYPE_INFO)
+
   val userData = List(
     (11, 1L, "Julian"),
     (22, 2L, "Jark"),
@@ -65,6 +70,20 @@ class LookupJoinITCase extends StreamingTestBase {
     .field("name", Types.STRING)
     .build()
 
+  val userDataWithNull = List(
+    (11, 1L, "Julian"),
+    (22, null, "Hello"),
+    (33, 3L, "Fabian"),
+    (44, null, "Hello world")
+  )
+
+  val userWithNullDataTableSourceWith2Keys = InMemoryLookupableTableSource.builder()
+    .data(userDataWithNull)
+    .field("age", Types.INT)
+    .field("id", Types.LONG)
+    .field("name", Types.STRING)
+    .build()
+
   @Test
   def testJoinTemporalTable(): Unit = {
     val streamTable = env.fromCollection(data)
@@ -137,11 +156,7 @@ class LookupJoinITCase extends StreamingTestBase {
 
   @Test
   def testJoinTemporalTableOnNullableKey(): Unit = {
-
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO)
+    implicit val tpe: TypeInformation[Row] = dataRowType
     val streamTable = env.fromCollection(dataWithNull)
       .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
     tEnv.registerTable("T", streamTable)
@@ -366,11 +381,7 @@ class LookupJoinITCase extends StreamingTestBase {
 
   @Test
   def testLeftJoinTemporalTableOnNullableKey(): Unit = {
-
-    implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
-      BasicTypeInfo.LONG_TYPE_INFO,
-      BasicTypeInfo.INT_TYPE_INFO,
-      BasicTypeInfo.STRING_TYPE_INFO)
+    implicit val tpe: TypeInformation[Row] = dataRowType
     val streamTable = env.fromCollection(dataWithNull)
       .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
     tEnv.registerTable("T", streamTable)
@@ -418,4 +429,90 @@ class LookupJoinITCase extends StreamingTestBase {
     assertEquals(0, userTableSource.getResourceCounter)
   }
 
+  @Test
+  def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+    implicit val tpe: TypeInformation[Row] = dataRowType
+    val streamTable = env.fromCollection(dataWithNull)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      "3,15,Fabian")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+    assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+  }
+
+  @Test
+  def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+    implicit val tpe: TypeInformation[Row] = dataRowType
+    val streamTable = env.fromCollection(dataWithNull)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+    val sql = "SELECT D.id, T.len, D.name FROM T LEFT JOIN userTable " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = Seq(
+      "null,15,null",
+      "3,15,Fabian",
+      "null,11,null",
+      "null,12,null")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+    assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+  }
+
+  @Test
+  def testJoinTemporalTableOnNullConstantKey(): Unit = {
+    implicit val tpe: TypeInformation[Row] = dataRowType
+    val streamTable = env.fromCollection(dataWithNull)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+    val sql = "SELECT T.id, T.len, T.content FROM T JOIN userTable " +
+      "for system_time as of T.proctime AS D ON D.id = null"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    assertTrue(sink.getAppendResults.isEmpty)
+    assertEquals(0, userTableSource.getResourceCounter)
+  }
+
+  @Test
+  def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
+    val streamTable = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+
+    tEnv.registerTableSource("userTable", userTableSourceWith2Keys)
+
+    val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+      "for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id"
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    assertTrue(sink.getAppendResults.isEmpty)
+    assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+  }
+
 }
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
index 33d1f60..9997741 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
@@ -205,6 +205,8 @@ object InMemoryLookupableTableSource {
     @varargs
     def eval(inputs: AnyRef*): Unit = {
       val key = Row.of(inputs: _*)
+      Preconditions.checkArgument(!inputs.contains(null),
+        s"Lookup key %s contains null value, which would not happen.", key)
       data.get(key) match {
         case Some(list) => list.foreach(result => collect(result))
         case None => // do nothing
@@ -236,8 +238,11 @@ object InMemoryLookupableTableSource {
 
     @varargs
     def eval(resultFuture: CompletableFuture[util.Collection[Row]], inputs: AnyRef*): Unit = {
+      val key = Row.of(inputs: _*)
+      Preconditions.checkArgument(!inputs.contains(null),
+        s"Lookup key %s contains null value, which would not happen.", key)
       CompletableFuture
-        .supplyAsync(new CollectionSupplier(data, Row.of(inputs: _*)), executor)
+        .supplyAsync(new CollectionSupplier(data, key), executor)
         .thenAccept(new CollectionConsumer(resultFuture))
     }
 


[flink] 02/02: [FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 78d1e68ab120e2fecb931e6d9ea1ee8bef247cde
Author: beyond1920 <be...@126.com>
AuthorDate: Fri Aug 2 10:13:45 2019 +0800

    [FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.
---
 .../physical/common/CommonLookupJoinRule.scala     | 24 ++++++++++++++++++++++
 .../plan/batch/sql/join/LookupJoinTest.scala       | 22 ++++++++++++++++++++
 .../plan/stream/sql/join/LookupJoinTest.scala      | 22 ++++++++++++++++++++
 3 files changed, 68 insertions(+)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
index 08510b2..62bd3dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin
 import org.apache.flink.table.planner.plan.nodes.logical._
 import org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan
 import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.planner.plan.utils.JoinUtil
 import org.apache.flink.table.sources.{LookupableTableSource, TableSource}
 
 import org.apache.calcite.plan.RelOptRule.{any, operand}
@@ -30,6 +31,10 @@ import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.core.TableScan
 import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexProgram}
 
+import java.util
+
+import scala.collection.JavaConversions._
+
 /**
   * Base implementation for both
   * [[org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecLookupJoinRule]] and
@@ -86,6 +91,23 @@ trait CommonLookupJoinRule {
     }
   }
 
+  // TODO Support `IS NOT DISTINCT FROM` in the future: FLINK-13509
+  protected def validateJoin(join: FlinkLogicalJoin): Unit = {
+
+    val filterNulls: Array[Boolean] = {
+      val filterNulls = new util.ArrayList[java.lang.Boolean]
+      JoinUtil.createJoinInfo(join.getLeft, join.getRight, join.getCondition, filterNulls)
+      filterNulls.map(_.booleanValue()).toArray
+    }
+
+    if (filterNulls.contains(false)) {
+      throw new TableException(
+        s"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+          s"alternative '(a = b) or (a IS NULL AND b IS NULL)'), the join condition is " +
+          s"'${join.getCondition}'")
+    }
+  }
+
   protected def transform(
     join: FlinkLogicalJoin,
     input: FlinkLogicalRel,
@@ -115,6 +137,7 @@ abstract class BaseSnapshotOnTableScanRule(description: String)
     val tableScan = call.rel[RelNode](3)
     val tableSource = findTableSource(tableScan).orNull
 
+    validateJoin(join)
     val temporalJoin = transform(join, input, tableSource, None)
     call.transformTo(temporalJoin)
   }
@@ -145,6 +168,7 @@ abstract class BaseSnapshotOnCalcTableScanRule(description: String)
     val tableScan = call.rel[RelNode](4)
     val tableSource = findTableSource(tableScan).orNull
 
+    validateJoin(join)
     val temporalJoin = transform(
       join, input, tableSource, Some(calc.getProgram))
     call.transformTo(temporalJoin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
index 64e1a8b..ae0414dd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
@@ -86,6 +86,28 @@ class LookupJoinTest extends TableTestBase {
   }
 
   @Test
+  def testNotDistinctFromInJoinCondition(): Unit = {
+
+    // does not support join condition contains `IS NOT DISTINCT`
+    expectExceptionThrown(
+      "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT  DISTINCT FROM D.id",
+      "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+        "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+      classOf[TableException]
+    )
+
+    // does not support join condition contains `IS NOT  DISTINCT` and similar syntax
+    expectExceptionThrown(
+      "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
+      "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+        "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+      classOf[TableException]
+    )
+  }
+
+  @Test
   def testLogicalPlan(): Unit = {
     val sql1 =
       """
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 2b21265..5bf012b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -101,6 +101,28 @@ class LookupJoinTest extends TableTestBase with Serializable {
   }
 
   @Test
+  def testNotDistinctFromInJoinCondition(): Unit = {
+
+    // does not support join condition contains `IS NOT DISTINCT`
+    expectExceptionThrown(
+      "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT  DISTINCT FROM D.id",
+      "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+        "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+      classOf[TableException]
+    )
+
+    // does not support join condition contains `IS NOT  DISTINCT` and similar syntax
+    expectExceptionThrown(
+      "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+        "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
+      "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+        "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+      classOf[TableException]
+    )
+  }
+
+  @Test
   def testInvalidLookupTableFunction(): Unit = {
     streamUtil.addDataStream[(Int, String, Long, Timestamp)](
       "T", 'a, 'b, 'c, 'ts, 'proctime.proctime)