You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ja...@apache.org on 2020/08/19 02:08:44 UTC

[flink] branch release-1.10 updated: [FLINK-18212][table-planner-blink] Fix Lookup Join failed when there is a UDF equal condition on the column of temporal table

This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new 13c4b42  [FLINK-18212][table-planner-blink] Fix Lookup Join failed when there is a UDF equal condition on the column of temporal table
13c4b42 is described below

commit 13c4b425228c7be691173deec48024d97fce0dc1
Author: Jark Wu <ja...@apache.org>
AuthorDate: Wed Aug 19 10:07:22 2020 +0800

    [FLINK-18212][table-planner-blink] Fix Lookup Join failed when there is a UDF equal condition on the column of temporal table
    
    This closes #13183
---
 .../plan/nodes/common/CommonLookupJoin.scala       |  1 +
 .../plan/stream/sql/join/LookupJoinTest.xml        | 30 ++++++++++++++++++++++
 .../plan/stream/sql/join/LookupJoinTest.scala      | 14 ++++++++++
 .../runtime/stream/sql/LookupJoinITCase.scala      | 24 +++++++++++++++++
 4 files changed, 69 insertions(+)

diff --git a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
index 008532d..7855ce6 100644
--- a/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
+++ b/flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/common/CommonLookupJoin.scala
@@ -601,6 +601,7 @@ abstract class CommonLookupJoin(
       val (inputRef, literal) = (left, right) match {
         case (literal: RexLiteral, ref: RexInputRef) => (ref, literal)
         case (ref: RexInputRef, literal: RexLiteral) => (ref, literal)
+        case _ => return // non-constant condition
       }
       val dataType = FlinkTypeFactory.toLogicalType(inputRef.getType)
       constantFieldMap.put(inputRef.getIndex, ConstantLookupKey(dataType, literal))
diff --git a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
index d8ce0a5..6712da1 100644
--- a/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
+++ b/flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.xml
@@ -350,4 +350,34 @@ Calc(select=[a, b, c, PROCTIME_MATERIALIZE(proctime) AS proctime, rowtime, id, n
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testJoinTemporalTableWithUdfEqualFilter">
+    <Resource name="sql">
+      <![CDATA[
+SELECT
+  T.a, T.b, T.c, D.name
+FROM
+  MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+WHERE CONCAT('Hello-', D.name) = 'Hello-Jark'
+      ]]>
+    </Resource>
+    <Resource name="planBefore">
+      <![CDATA[
+LogicalProject(a=[$0], b=[$1], c=[$2], name=[$6])
++- LogicalFilter(condition=[=(CONCAT(_UTF-16LE'Hello-', $6), _UTF-16LE'Hello-Jark')])
+   +- LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{0, 3}])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalFilter(condition=[=($cor0.a, $0)])
+         +- LogicalSnapshot(period=[$cor0.proctime])
+            +- LogicalTableScan(table=[[default_catalog, default_database, temporalTest, source: [TestTemporalTable(id, name, age)]]])
+]]>
+    </Resource>
+    <Resource name="planAfter">
+      <![CDATA[
+Calc(select=[a, b, c, name])
++- LookupJoin(table=[TestTemporalTable(id, name, age)], joinType=[InnerJoin], async=[false], lookup=[id=a], where=[=(CONCAT(_UTF-16LE'Hello-', name), _UTF-16LE'Hello-Jark':VARCHAR(2147483647) CHARACTER SET "UTF-16LE")], select=[a, b, c, id, name])
+   +- Calc(select=[a, b, c])
+      +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
index d64eb4d..98937d0 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/LookupJoinTest.scala
@@ -394,6 +394,20 @@ class LookupJoinTest extends TableTestBase with Serializable {
     streamUtil.verifyPlan(sql)
   }
 
+  @Test
+  def testJoinTemporalTableWithUdfEqualFilter(): Unit = {
+    val sql =
+      """
+        |SELECT
+        |  T.a, T.b, T.c, D.name
+        |FROM
+        |  MyTable AS T JOIN temporalTest FOR SYSTEM_TIME AS OF T.proctime AS D ON T.a = D.id
+        |WHERE CONCAT('Hello-', D.name) = 'Hello-Jark'
+      """.stripMargin
+
+    streamUtil.verifyPlan(sql)
+  }
+
   // ==========================================================================================
 
   private def expectExceptionThrown(
diff --git a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
index e12722d..0b2e8c3 100644
--- a/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
+++ b/flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/LookupJoinITCase.scala
@@ -135,6 +135,30 @@ class LookupJoinITCase extends StreamingTestBase {
   }
 
   @Test
+  def testJoinTemporalTableWithUdfEqualFilter(): Unit = {
+    val streamTable = env.fromCollection(data)
+      .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)
+    tEnv.registerTable("T", streamTable)
+    tEnv.registerTableSource("userTable", userTableSource)
+    val sql =
+      """
+        |SELECT
+        |  T.id, T.len, T.content, D.name
+        |FROM
+        |  T JOIN userTable for system_time as of T.proctime AS D
+        |ON T.id = D.id
+        |WHERE CONCAT('Hello-', D.name) = 'Hello-Jark'
+        |""".stripMargin
+
+    val sink = new TestingAppendSink
+    tEnv.sqlQuery(sql).toAppendStream[Row].addSink(sink)
+    env.execute()
+
+    val expected = Seq("2,15,Hello,Jark")
+    assertEquals(expected.sorted, sink.getAppendResults.sorted)
+  }
+
+  @Test
   def testJoinTemporalTableOnConstantKey(): Unit = {
     val streamTable = env.fromCollection(data)
       .toTable(tEnv, 'id, 'len, 'content, 'proctime.proctime)