You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2019/08/07 02:25:05 UTC
[flink] branch master updated (5e5f5be -> 78d1e68)
This is an automated email from the ASF dual-hosted git repository.
jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.
from 5e5f5be [FLINK-13044][s3][fs] Fix handling of relocated amazon classes
new 4b48eb8 [FLINK-13433][table-planner-blink] Do not fetch data from LookupableTableSource if the JoinKey in left side of LookupJoin contains null value
new 78d1e68 [FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../planner/codegen/LookupJoinCodeGenerator.scala | 32 +++-
.../plan/nodes/common/CommonLookupJoin.scala | 2 +-
.../physical/common/CommonLookupJoinRule.scala | 24 +++
.../plan/batch/sql/join/LookupJoinTest.scala | 22 +++
.../plan/stream/sql/join/LookupJoinTest.scala | 22 +++
.../runtime/batch/sql/join/LookupJoinITCase.scala | 173 ++++++++++++---------
.../runtime/stream/sql/LookupJoinITCase.scala | 119 ++++++++++++--
.../utils/InMemoryLookupableTableSource.scala | 7 +-
8 files changed, 308 insertions(+), 93 deletions(-)
[flink] 01/02: [FLINK-13433][table-planner-blink] Do not fetch data
from LookupableTableSource if the JoinKey in left side of LookupJoin
contains null value
Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 4b48eb89283f1d61514a4a52263d8291b724da90
Author: beyond1920 <be...@126.com>
AuthorDate: Wed Jul 31 13:08:17 2019 +0800
[FLINK-13433][table-planner-blink] Do not fetch data from LookupableTableSource if the JoinKey in left side of LookupJoin contains null value
This closes #9285
---
.../planner/codegen/LookupJoinCodeGenerator.scala | 32 +++-
.../plan/nodes/common/CommonLookupJoin.scala | 2 +-
.../runtime/batch/sql/join/LookupJoinITCase.scala | 173 ++++++++++++---------
.../runtime/stream/sql/LookupJoinITCase.scala | 119 ++++++++++++--
.../utils/InMemoryLookupableTableSource.scala | 7 +-
5 files changed, 240 insertions(+), 93 deletions(-)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
index 044d8dd..9a9bf2e 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/codegen/LookupJoinCodeGenerator.scala
@@ -65,7 +65,7 @@ object LookupJoinCodeGenerator {
: GeneratedFunction[FlatMapFunction[BaseRow, BaseRow]] = {
val ctx = CodeGeneratorContext(config)
- val (prepareCode, parameters) = prepareParameters(
+ val (prepareCode, parameters, nullInParameters) = prepareParameters(
ctx,
typeFactory,
inputType,
@@ -87,11 +87,17 @@ object LookupJoinCodeGenerator {
s"$lookupFunctionTerm.setCollector($DEFAULT_COLLECTOR_TERM);"
}
+ // TODO: filter all records when there is any nulls on the join key, because
+ // "IS NOT DISTINCT FROM" is not supported yet.
val body =
s"""
|$prepareCode
|$setCollectorCode
- |$lookupFunctionTerm.eval($parameters);
+ |if ($nullInParameters) {
+ | return;
+ |} else {
+ | $lookupFunctionTerm.eval($parameters);
+ | }
""".stripMargin
FunctionCodeGenerator.generateFunction(
@@ -118,7 +124,7 @@ object LookupJoinCodeGenerator {
: GeneratedFunction[AsyncFunction[BaseRow, AnyRef]] = {
val ctx = CodeGeneratorContext(config)
- val (prepareCode, parameters) = prepareParameters(
+ val (prepareCode, parameters, nullInParameters) = prepareParameters(
ctx,
typeFactory,
inputType,
@@ -130,11 +136,18 @@ object LookupJoinCodeGenerator {
val lookupFunctionTerm = ctx.addReusableFunction(asyncLookupFunction)
val DELEGATE = className[DelegatingResultFuture[_]]
+ // TODO: filter all records when there is any nulls on the join key, because
+ // "IS NOT DISTINCT FROM" is not supported yet.
val body =
s"""
|$prepareCode
- |$DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
- |$lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters);
+ |if ($nullInParameters) {
+ | $DEFAULT_COLLECTOR_TERM.complete(java.util.Collections.emptyList());
+ | return;
+ |} else {
+ | $DELEGATE delegates = new $DELEGATE($DEFAULT_COLLECTOR_TERM);
+ | $lookupFunctionTerm.eval(delegates.getCompletableFuture(), $parameters);
+ |}
""".stripMargin
FunctionCodeGenerator.generateFunction(
@@ -156,7 +169,7 @@ object LookupJoinCodeGenerator {
lookupKeyInOrder: Array[Int],
allLookupFields: Map[Int, LookupKey],
isExternalArgs: Boolean,
- fieldCopy: Boolean): (String, String) = {
+ fieldCopy: Boolean): (String, String, String) = {
val inputFieldExprs = for (i <- lookupKeyInOrder) yield {
allLookupFields.get(i) match {
@@ -195,9 +208,12 @@ object LookupJoinCodeGenerator {
| $newTerm = $assign;
|}
""".stripMargin
- (code, newTerm)
+ (code, newTerm, e.nullTerm)
}
- (codeAndArg.map(_._1).mkString("\n"), codeAndArg.map(_._2).mkString(", "))
+ (
+ codeAndArg.map(_._1).mkString("\n"),
+ codeAndArg.map(_._2).mkString(", "),
+ codeAndArg.map(_._3).mkString("|| "))
}
/**
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
index 7a2b133..88800da 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
@@ -560,7 +560,7 @@ abstract class CommonLookupJoin(
joinType: JoinRelType): Unit = {
// check join on all fields of PRIMARY KEY or (UNIQUE) INDEX
- if (allLookupKeys.isEmpty || allLookupKeys.isEmpty) {
+ if (allLookupKeys.isEmpty) {
throw new TableException(
"Temporal table join requires an equality condition on fields of " +
s"table [${tableSource.explainSource()}].")
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
index fe141f9..6fac55e 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/LookupJoinITCase.scala
@@ -22,9 +22,17 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo
import org.apache.flink.table.api.Types
import org.apache.flink.table.planner.runtime.utils.{BatchTableEnvUtil, BatchTestBase, InMemoryLookupableTableSource}
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
import org.junit.{Before, Test}
-class LookupJoinITCase extends BatchTestBase {
+import java.lang.Boolean
+import java.util
+
+import scala.collection.JavaConversions._
+
+@RunWith(classOf[Parameterized])
+class LookupJoinITCase(isAsyncMode: Boolean) extends BatchTestBase {
val data = List(
BatchTestBase.row(1L, 12L, "Julian"),
@@ -33,6 +41,12 @@ class LookupJoinITCase extends BatchTestBase {
BatchTestBase.row(8L, 11L, "Hello world"),
BatchTestBase.row(9L, 12L, "Hello world!"))
+ val dataWithNull = List(
+ BatchTestBase.row(null, 15L, "Hello"),
+ BatchTestBase.row(3L, 15L, "Fabian"),
+ BatchTestBase.row(null, 11L, "Hello world"),
+ BatchTestBase.row(9L, 12L, "Hello world!"))
+
val typeInfo = new RowTypeInfo(LONG_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO)
val userData = List(
@@ -55,19 +69,72 @@ class LookupJoinITCase extends BatchTestBase {
.enableAsync()
.build()
+ val userDataWithNull = List(
+ (11, 1L, "Julian"),
+ (22, null, "Hello"),
+ (33, 3L, "Fabian"),
+ (44, null, "Hello world"))
+
+ val userWithNullDataTableSource = InMemoryLookupableTableSource.builder()
+ .data(userDataWithNull)
+ .field("age", Types.INT)
+ .field("id", Types.LONG)
+ .field("name", Types.STRING)
+ .build()
+
+ val userAsyncWithNullDataTableSource = InMemoryLookupableTableSource.builder()
+ .data(userDataWithNull)
+ .field("age", Types.INT)
+ .field("id", Types.LONG)
+ .field("name", Types.STRING)
+ .enableAsync()
+ .build()
+
+ var userTable: String = _
+ var userTableWithNull: String = _
+
@Before
override def before() {
super.before()
BatchTableEnvUtil.registerCollection(tEnv, "T0", data, typeInfo, "id, len, content")
val myTable = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime FROM T0")
tEnv.registerTable("T", myTable)
+
+ BatchTableEnvUtil.registerCollection(
+ tEnv, "T1", dataWithNull, typeInfo, "id, len, content")
+ val myTable1 = tEnv.sqlQuery("SELECT *, PROCTIME() as proctime FROM T1")
+ tEnv.registerTable("nullableT", myTable1)
+
tEnv.registerTableSource("userTable", userTableSource)
tEnv.registerTableSource("userAsyncTable", userAsyncTableSource)
+ userTable = if (isAsyncMode) "userAsyncTable" else "userTable"
+
+ tEnv.registerTableSource("userWithNullDataTable", userWithNullDataTableSource)
+ tEnv.registerTableSource("userWithNullDataAsyncTable", userAsyncWithNullDataTableSource)
+ userTableWithNull = if (isAsyncMode) "userWithNullDataAsyncTable" else "userWithNullDataTable"
+
+ // TODO: enable object reuse until [FLINK-12351] is fixed.
+ env.getConfig.disableObjectReuse()
+ }
+
+ @Test
+ def testLeftJoinTemporalTableWithLocalPredicate(): Unit = {
+ val sql = s"SELECT T.id, T.len, T.content, D.name, D.age FROM T LEFT JOIN $userTable " +
+ "for system_time as of T.proctime AS D ON T.id = D.id " +
+ "AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' " +
+ "WHERE T.id > 1"
+
+ val expected = Seq(
+ BatchTestBase.row(2, 15, "Hello", null, null),
+ BatchTestBase.row(3, 15, "Fabian", "Fabian", 33),
+ BatchTestBase.row(8, 11, "Hello world", null, null),
+ BatchTestBase.row(9, 12, "Hello world!", null, null))
+ checkResult(sql, expected, false)
}
@Test
def testJoinTemporalTable(): Unit = {
- val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " +
+ val sql = s"SELECT T.id, T.len, T.content, D.name FROM T JOIN $userTable " +
"for system_time as of T.proctime AS D ON T.id = D.id"
val expected = Seq(
@@ -79,7 +146,7 @@ class LookupJoinITCase extends BatchTestBase {
@Test
def testJoinTemporalTableWithPushDown(): Unit = {
- val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userTable " +
+ val sql = s"SELECT T.id, T.len, T.content, D.name FROM T JOIN $userTable " +
"for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20"
val expected = Seq(
@@ -90,7 +157,7 @@ class LookupJoinITCase extends BatchTestBase {
@Test
def testJoinTemporalTableWithNonEqualFilter(): Unit = {
- val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN userTable " +
+ val sql = s"SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN $userTable " +
"for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= D.age"
val expected = Seq(
@@ -101,7 +168,7 @@ class LookupJoinITCase extends BatchTestBase {
@Test
def testJoinTemporalTableOnMultiFields(): Unit = {
- val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+ val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
"for system_time as of T.proctime AS D ON T.id = D.id AND T.content = D.name"
val expected = Seq(
@@ -112,7 +179,7 @@ class LookupJoinITCase extends BatchTestBase {
@Test
def testJoinTemporalTableOnMultiFieldsWithUdf(): Unit = {
- val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+ val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
"for system_time as of T.proctime AS D ON mod(T.id, 4) = D.id AND T.content = D.name"
val expected = Seq(
@@ -123,7 +190,7 @@ class LookupJoinITCase extends BatchTestBase {
@Test
def testJoinTemporalTableOnMultiKeyFields(): Unit = {
- val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+ val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
"for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
val expected = Seq(
@@ -134,7 +201,7 @@ class LookupJoinITCase extends BatchTestBase {
@Test
def testLeftJoinTemporalTable(): Unit = {
- val sql = "SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN userTable " +
+ val sql = s"SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN $userTable " +
"for system_time as of T.proctime AS D ON T.id = D.id"
val expected = Seq(
@@ -147,88 +214,50 @@ class LookupJoinITCase extends BatchTestBase {
}
@Test
- def testAsyncJoinTemporalTable(): Unit = {
- // TODO: enable object reuse until [FLINK-12351] is fixed.
- env.getConfig.disableObjectReuse()
- val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userAsyncTable " +
- "for system_time as of T.proctime AS D ON T.id = D.id"
-
- val expected = Seq(
- BatchTestBase.row(1, 12, "Julian", "Julian"),
- BatchTestBase.row(2, 15, "Hello", "Jark"),
- BatchTestBase.row(3, 15, "Fabian", "Fabian"))
- checkResult(sql, expected, false)
- }
-
- @Test
- def testAsyncJoinTemporalTableWithPushDown(): Unit = {
- // TODO: enable object reuse until [FLINK-12351] is fixed.
- env.getConfig.disableObjectReuse()
- val sql = "SELECT T.id, T.len, T.content, D.name FROM T JOIN userAsyncTable " +
- "for system_time as of T.proctime AS D ON T.id = D.id AND D.age > 20"
+ def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+ val sql = s"SELECT T.id, T.len, D.name FROM nullableT T JOIN $userTableWithNull " +
+ "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
val expected = Seq(
- BatchTestBase.row(2, 15, "Hello", "Jark"),
- BatchTestBase.row(3, 15, "Fabian", "Fabian"))
+ BatchTestBase.row(3,15,"Fabian"))
checkResult(sql, expected, false)
}
@Test
- def testAsyncJoinTemporalTableWithNonEqualFilter(): Unit = {
- // TODO: enable object reuse until [FLINK-12351] is fixed.
- env.getConfig.disableObjectReuse()
- val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T JOIN userAsyncTable " +
- "for system_time as of T.proctime AS D ON T.id = D.id WHERE T.len <= D.age"
-
+ def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+ val sql = s"SELECT D.id, T.len, D.name FROM nullableT T LEFT JOIN $userTableWithNull " +
+ "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
val expected = Seq(
- BatchTestBase.row(2, 15, "Hello", "Jark", 22),
- BatchTestBase.row(3, 15, "Fabian", "Fabian", 33))
+ BatchTestBase.row(null,15,null),
+ BatchTestBase.row(3,15,"Fabian"),
+ BatchTestBase.row(null,11,null),
+ BatchTestBase.row(null,12,null))
checkResult(sql, expected, false)
}
@Test
- def testAsyncLeftJoinTemporalTableWithLocalPredicate(): Unit = {
- // TODO: enable object reuse until [FLINK-12351] is fixed.
- env.getConfig.disableObjectReuse()
- val sql = "SELECT T.id, T.len, T.content, D.name, D.age FROM T LEFT JOIN userAsyncTable " +
- "for system_time as of T.proctime AS D ON T.id = D.id " +
- "AND T.len > 1 AND D.age > 20 AND D.name = 'Fabian' " +
- "WHERE T.id > 1"
-
- val expected = Seq(
- BatchTestBase.row(2, 15, "Hello", null, null),
- BatchTestBase.row(3, 15, "Fabian", "Fabian", 33),
- BatchTestBase.row(8, 11, "Hello world", null, null),
- BatchTestBase.row(9, 12, "Hello world!", null, null))
+ def testJoinTemporalTableOnNullConstantKey(): Unit = {
+ val sql = s"SELECT T.id, T.len, T.content FROM T JOIN $userTable " +
+ "for system_time as of T.proctime AS D ON D.id = null"
+ val expected = Seq()
checkResult(sql, expected, false)
}
@Test
- def testAsyncJoinTemporalTableOnMultiFields(): Unit = {
- // TODO: enable object reuse until [FLINK-12351] is fixed.
- env.getConfig.disableObjectReuse()
- val sql = "SELECT T.id, T.len, D.name FROM T JOIN userAsyncTable " +
- "for system_time as of T.proctime AS D ON T.id = D.id AND T.content = D.name"
-
- val expected = Seq(
- BatchTestBase.row(1, 12, "Julian"),
- BatchTestBase.row(3, 15, "Fabian"))
+ def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
+ val sql = s"SELECT T.id, T.len, D.name FROM T JOIN $userTable " +
+ "for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id"
+ val expected = Seq()
checkResult(sql, expected, false)
}
+}
- @Test
- def testAsyncLeftJoinTemporalTable(): Unit = {
- // TODO: enable object reuse until [FLINK-12351] is fixed.
- env.getConfig.disableObjectReuse()
- val sql = "SELECT T.id, T.len, D.name, D.age FROM T LEFT JOIN userAsyncTable " +
- "for system_time as of T.proctime AS D ON T.id = D.id"
+object LookupJoinITCase {
- val expected = Seq(
- BatchTestBase.row(1, 12, "Julian", 11),
- BatchTestBase.row(2, 15, "Jark", 22),
- BatchTestBase.row(3, 15, "Fabian", 33),
- BatchTestBase.row(8, 11, null, null),
- BatchTestBase.row(9, 12, null, null))
- checkResult(sql, expected, false)
+ @Parameterized.Parameters(name = "isAsyncMode = {0}")
+ def parameters(): util.Collection[Array[java.lang.Object]] = {
+ Seq[Array[AnyRef]](
+ Array(Boolean.TRUE), Array(Boolean.FALSE)
+ )
}
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index c0dbdc7..f145e2b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -26,7 +26,7 @@ import org.apache.flink.table.planner.runtime.utils.UserDefinedFunctionTestUtils
import org.apache.flink.table.planner.runtime.utils.{InMemoryLookupableTableSource, StreamingTestBase, TestingAppendSink}
import org.apache.flink.types.Row
-import org.junit.Assert.assertEquals
+import org.junit.Assert.{assertEquals, assertTrue}
import org.junit.Test
import java.lang.{Integer => JInt, Long => JLong}
@@ -46,6 +46,11 @@ class LookupJoinITCase extends StreamingTestBase {
Row.of(null, new JInt(11), "Hello world"),
Row.of(new JLong(9), new JInt(12), "Hello world!"))
+ val dataRowType:TypeInformation[Row] = new RowTypeInfo(
+ BasicTypeInfo.LONG_TYPE_INFO,
+ BasicTypeInfo.INT_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO)
+
val userData = List(
(11, 1L, "Julian"),
(22, 2L, "Jark"),
@@ -65,6 +70,20 @@ class LookupJoinITCase extends StreamingTestBase {
.field("name", Types.STRING)
.build()
+ val userDataWithNull = List(
+ (11, 1L, "Julian"),
+ (22, null, "Hello"),
+ (33, 3L, "Fabian"),
+ (44, null, "Hello world")
+ )
+
+ val userWithNullDataTableSourceWith2Keys = InMemoryLookupableTableSource.builder()
+ .data(userDataWithNull)
+ .field("age", Types.INT)
+ .field("id", Types.LONG)
+ .field("name", Types.STRING)
+ .build()
+
@Test
def testJoinTemporalTable(): Unit = {
val streamTable = env.fromCollection(data)
@@ -137,11 +156,7 @@ class LookupJoinITCase extends StreamingTestBase {
@Test
def testJoinTemporalTableOnNullableKey(): Unit = {
-
- implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO)
+ implicit val tpe: TypeInformation[Row] = dataRowType
val streamTable = env.fromCollection(dataWithNull)
.toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
tEnv.registerTable("T", streamTable)
@@ -366,11 +381,7 @@ class LookupJoinITCase extends StreamingTestBase {
@Test
def testLeftJoinTemporalTableOnNullableKey(): Unit = {
-
- implicit val tpe: TypeInformation[Row] = new RowTypeInfo(
- BasicTypeInfo.LONG_TYPE_INFO,
- BasicTypeInfo.INT_TYPE_INFO,
- BasicTypeInfo.STRING_TYPE_INFO)
+ implicit val tpe: TypeInformation[Row] = dataRowType
val streamTable = env.fromCollection(dataWithNull)
.toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
tEnv.registerTable("T", streamTable)
@@ -418,4 +429,90 @@ class LookupJoinITCase extends StreamingTestBase {
assertEquals(0, userTableSource.getResourceCounter)
}
+ @Test
+ def testJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+ implicit val tpe: TypeInformation[Row] = dataRowType
+ val streamTable = env.fromCollection(dataWithNull)
+ .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+ tEnv.registerTable("T", streamTable)
+
+ tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+ val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+ "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = Seq(
+ "3,15,Fabian")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+ }
+
+ @Test
+ def testLeftJoinTemporalTableOnMultiKeyFieldsWithNullData(): Unit = {
+ implicit val tpe: TypeInformation[Row] = dataRowType
+ val streamTable = env.fromCollection(dataWithNull)
+ .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+ tEnv.registerTable("T", streamTable)
+
+ tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+ val sql = "SELECT D.id, T.len, D.name FROM T LEFT JOIN userTable " +
+ "for system_time as of T.proctime AS D ON T.content = D.name AND T.id = D.id"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = Seq(
+ "null,15,null",
+ "3,15,Fabian",
+ "null,11,null",
+ "null,12,null")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+ }
+
+ @Test
+ def testJoinTemporalTableOnNullConstantKey(): Unit = {
+ implicit val tpe: TypeInformation[Row] = dataRowType
+ val streamTable = env.fromCollection(dataWithNull)
+ .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+ tEnv.registerTable("T", streamTable)
+
+ tEnv.registerTableSource("userTable", userWithNullDataTableSourceWith2Keys)
+
+ val sql = "SELECT T.id, T.len, T.content FROM T JOIN userTable " +
+ "for system_time as of T.proctime AS D ON D.id = null"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ assertTrue(sink.getAppendResults.isEmpty)
+ assertEquals(0, userTableSource.getResourceCounter)
+ }
+
+ @Test
+ def testJoinTemporalTableOnMultiKeyFieldsWithNullConstantKey(): Unit = {
+ val streamTable = env.fromCollection(data)
+ .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+ tEnv.registerTable("T", streamTable)
+
+ tEnv.registerTableSource("userTable", userTableSourceWith2Keys)
+
+ val sql = "SELECT T.id, T.len, D.name FROM T JOIN userTable " +
+ "for system_time as of T.proctime AS D ON T.content = D.name AND null = D.id"
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ assertTrue(sink.getAppendResults.isEmpty)
+ assertEquals(0, userTableSourceWith2Keys.getResourceCounter)
+ }
+
}
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
index 33d1f60..9997741 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/utils/InMemoryLookupableTableSource.scala
@@ -205,6 +205,8 @@ object InMemoryLookupableTableSource {
@varargs
def eval(inputs: AnyRef*): Unit = {
val key = Row.of(inputs: _*)
+ Preconditions.checkArgument(!inputs.contains(null),
+ s"Lookup key %s contains null value, which would not happen.", key)
data.get(key) match {
case Some(list) => list.foreach(result => collect(result))
case None => // do nothing
@@ -236,8 +238,11 @@ object InMemoryLookupableTableSource {
@varargs
def eval(resultFuture: CompletableFuture[util.Collection[Row]], inputs: AnyRef*): Unit = {
+ val key = Row.of(inputs: _*)
+ Preconditions.checkArgument(!inputs.contains(null),
+ s"Lookup key %s contains null value, which would not happen.", key)
CompletableFuture
- .supplyAsync(new CollectionSupplier(data, Row.of(inputs: _*)), executor)
+ .supplyAsync(new CollectionSupplier(data, key), executor)
.thenAccept(new CollectionConsumer(resultFuture))
}
[flink] 02/02: [FLINK-13509][table-planner-blink] Forbidden `IS NOT
DISTINCT FROM `(or an expanded version) in LookupJoin.
Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 78d1e68ab120e2fecb931e6d9ea1ee8bef247cde
Author: beyond1920 <be...@126.com>
AuthorDate: Fri Aug 2 10:13:45 2019 +0800
[FLINK-13509][table-planner-blink] Forbidden `IS NOT DISTINCT FROM `(or an expanded version) in LookupJoin.
---
.../physical/common/CommonLookupJoinRule.scala | 24 ++++++++++++++++++++++
.../plan/batch/sql/join/LookupJoinTest.scala | 22 ++++++++++++++++++++
.../plan/stream/sql/join/LookupJoinTest.scala | 22 ++++++++++++++++++++
3 files changed, 68 insertions(+)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
index 08510b2..62bd3dd 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/common/CommonLookupJoinRule.scala
@@ -22,6 +22,7 @@ import org.apache.flink.table.planner.plan.nodes.common.CommonLookupJoin
import org.apache.flink.table.planner.plan.nodes.logical._
import org.apache.flink.table.planner.plan.nodes.physical.PhysicalTableSourceScan
import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
+import org.apache.flink.table.planner.plan.utils.JoinUtil
import org.apache.flink.table.sources.{LookupableTableSource, TableSource}
import org.apache.calcite.plan.RelOptRule.{any, operand}
@@ -30,6 +31,10 @@ import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.core.TableScan
import org.apache.calcite.rex.{RexCorrelVariable, RexFieldAccess, RexProgram}
+import java.util
+
+import scala.collection.JavaConversions._
+
/**
* Base implementation for both
* [[org.apache.flink.table.planner.plan.rules.physical.batch.BatchExecLookupJoinRule]] and
@@ -86,6 +91,23 @@ trait CommonLookupJoinRule {
}
}
+ // TODO Support `IS NOT DISTINCT FROM` in the future: FLINK-13509
+ protected def validateJoin(join: FlinkLogicalJoin): Unit = {
+
+ val filterNulls: Array[Boolean] = {
+ val filterNulls = new util.ArrayList[java.lang.Boolean]
+ JoinUtil.createJoinInfo(join.getLeft, join.getRight, join.getCondition, filterNulls)
+ filterNulls.map(_.booleanValue()).toArray
+ }
+
+ if (filterNulls.contains(false)) {
+ throw new TableException(
+ s"LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ s"alternative '(a = b) or (a IS NULL AND b IS NULL)'), the join condition is " +
+ s"'${join.getCondition}'")
+ }
+ }
+
protected def transform(
join: FlinkLogicalJoin,
input: FlinkLogicalRel,
@@ -115,6 +137,7 @@ abstract class BaseSnapshotOnTableScanRule(description: String)
val tableScan = call.rel[RelNode](3)
val tableSource = findTableSource(tableScan).orNull
+ validateJoin(join)
val temporalJoin = transform(join, input, tableSource, None)
call.transformTo(temporalJoin)
}
@@ -145,6 +168,7 @@ abstract class BaseSnapshotOnCalcTableScanRule(description: String)
val tableScan = call.rel[RelNode](4)
val tableSource = findTableSource(tableScan).orNull
+ validateJoin(join)
val temporalJoin = transform(
join, input, tableSource, Some(calc.getProgram))
call.transformTo(temporalJoin)
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
index 64e1a8b..ae0414dd 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/batch/sql/join/LookupJoinTest.scala
@@ -86,6 +86,28 @@ class LookupJoinTest extends TableTestBase {
}
@Test
+ def testNotDistinctFromInJoinCondition(): Unit = {
+
+ // does not support join condition contains `IS NOT DISTINCT`
+ expectExceptionThrown(
+ "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT DISTINCT FROM D.id",
+ "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+ classOf[TableException]
+ )
+
+ // does not support join condition contains `IS NOT DISTINCT` and similar syntax
+ expectExceptionThrown(
+ "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
+ "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+ classOf[TableException]
+ )
+ }
+
+ @Test
def testLogicalPlan(): Unit = {
val sql1 =
"""
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index 2b21265..5bf012b 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -101,6 +101,28 @@ class LookupJoinTest extends TableTestBase with Serializable {
}
@Test
+ def testNotDistinctFromInJoinCondition(): Unit = {
+
+ // does not support join condition contains `IS NOT DISTINCT`
+ expectExceptionThrown(
+ "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a IS NOT DISTINCT FROM D.id",
+ "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+ classOf[TableException]
+ )
+
+ // does not support join condition contains `IS NOT DISTINCT` and similar syntax
+ expectExceptionThrown(
+ "SELECT * FROM MyTable AS T LEFT JOIN temporalTest " +
+ "FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id OR (T.a IS NULL AND D.id IS NULL)",
+ "LookupJoin doesn't support join condition contains 'a IS NOT DISTINCT FROM b' (or " +
+ "alternative '(a = b) or (a IS NULL AND b IS NULL)')",
+ classOf[TableException]
+ )
+ }
+
+ @Test
def testInvalidLookupTableFunction(): Unit = {
streamUtil.addDataStream[(Int, String, Long, Timestamp)](
"T", 'a, 'b, 'c, 'ts, 'proctime.proctime)