You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/08/19 02:08:44 UTC
[flink] branch release-1.10 updated:
[FLINK-18212][table-planner-blink] Fix Lookup Join failed when there is a
UDF equal condition on the column of temporal table
This is an automated email from the ASF dual-hosted git repository.
jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push:
new 13c4b42 [FLINK-18212][table-planner-blink] Fix Lookup Join failed when there is a UDF equal condition on the column of temporal table
13c4b42 is described below
commit 13c4b425228c7be691173deec48024d97fce0dc1
Author: Jark Wu <ja...@apache.org>
AuthorDate: Wed Aug 19 10:07:22 2020 +0800
[FLINK-18212][table-planner-blink] Fix Lookup Join failed when there is a UDF equal condition on the column of temporal table
This closes #13183
---
.../plan/nodes/common/CommonLookupJoin.scala | 1 +
.../plan/stream/sql/join/LookupJoinTest.xml | 30 ++++++++++++++++++++++
.../plan/stream/sql/join/LookupJoinTest.scala | 14 ++++++++++
.../runtime/stream/sql/LookupJoinITCase.scala | 24 +++++++++++++++++
4 files changed, 69 insertions(+)
diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
index 008532d..7855ce6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
@@ -601,6 +601,7 @@ abstract class CommonLookupJoin(
val (inputRef, literal) = (left, right) match {
case (literal: RexLiteral, ref: RexInputRef) => (ref, literal)
case (ref: RexInputRef, literal: RexLiteral) => (ref, literal)
+ case _ => return // non-constant condition
}
val dataType = FlinkTypeFactory.toLogicalType(inputRef.getType)
constantFieldMap.put(inputRef.getIndex, ConstantLookupKey(dataType, literal))
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index d8ce0a5..6712da1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -350,4 +350,34 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
]]>
</Resource>
</TestCase>
+ <TestCase name="testJoinTemporalTableWithUdfEqualFilter">
+ <Resource name="sql">
+ <![CDATA[
+SELECT
+ T.a, T.b, T.c, D.name
+FROM
+ MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+WHERE CONCAT('Hello-', D.name) = 'Hello-Jark'
+ ]]>
+ </Resource>
+ <Resource name="planBefore">
+ <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], name=[$6])
++- LogicalFilter(condition=[=(CONCAT(_UTF-16LE'Hello-', $6), _UTF-16LE'Hello-Jark')])
+ +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalFilter(condition=[=($cor0.a, $0)])
+ +- LogicalSnapshot(period=[$cor0.proctime])
+ +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+]]>
+ </Resource>
+ <Resource name="planAfter">
+ <![CDATA[
+Calc(select=[a, b, c, name])
++- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], lookup=[id=a], where=[=(CONCAT(_UTF-16LE'Hello-', name), _UTF-16LE'Hello-Jark':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")], select=[a, b, c, id, name])
+ +- Calc(select=[a, b, c])
+ +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index d64eb4d..98937d0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -394,6 +394,20 @@ class LookupJoinTest extends TableTestBase with Serializable {
streamUtil.verifyPlan(sql)
}
+ @Test
+ def testJoinTemporalTableWithUdfEqualFilter(): Unit = {
+ val sql =
+ """
+ |SELECT
+ | T.a, T.b, T.c, D.name
+ |FROM
+ | MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+ |WHERE CONCAT('Hello-', D.name) = 'Hello-Jark'
+ """.stripMargin
+
+ streamUtil.verifyPlan(sql)
+ }
+
// ==========================================================================================
private def expectExceptionThrown(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index e12722d..0b2e8c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -135,6 +135,30 @@ class LookupJoinITCase extends StreamingTestBase {
}
@Test
+ def testJoinTemporalTableWithUdfEqualFilter(): Unit = {
+ val streamTable = env.fromCollection(data)
+ .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+ tEnv.registerTable("T", streamTable)
+ tEnv.registerTableSource("userTable", userTableSource)
+ val sql =
+ """
+ |SELECT
+ | T.id, T.len, T.content, D.name
+ |FROM
+ | T JOIN userTable for system_time as of T.proctime AS D
+ |ON T.id = D.id
+ |WHERE CONCAT('Hello-', D.name) = 'Hello-Jark'
+ |""".stripMargin
+
+ val sink = new TestingAppendSink
+ tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+ env.execute()
+
+ val expected = Seq("2,15,Hello,Jark")
+ assertEquals(expected.sorted, sink.getAppendResults.sorted)
+ }
+
+ @Test
def testJoinTemporalTableOnConstantKey(): Unit = {
val streamTable = env.fromCollection(data)
.toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)