You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/06/27 12:06:11 UTC

[flink] branch release-1.15 updated: [FLINK-25097][table-planner] Fix bug in inner join when the filter condition is boolean type

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.15 by this push:
     new 1e47430137f [FLINK-25097][table-planner] Fix bug in inner join when the filter condition is boolean type
1e47430137f is described below

commit 1e47430137f6db24e2e24cafc30e1227787c6720
Author: zhengyunhong.zyh <zh...@alibaba-inc.com>
AuthorDate: Thu Apr 14 17:04:41 2022 +0800

    [FLINK-25097][table-planner] Fix bug in inner join when the filter condition is boolean type
    
    This closes #19579
    
    (cherry picked from commit 686634aa05aadc33cc7b655d25ad162fe59e41fd)
---
 .../planner/plan/utils/ColumnIntervalUtil.scala    | 57 ++++++++++++----------
 .../runtime/batch/sql/join/JoinITCase.scala        | 22 ++++++++-
 .../planner/runtime/stream/sql/JoinITCase.scala    | 22 +++++++++
 3 files changed, 72 insertions(+), 29 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
index 87bc3aa637f..fb1af546426 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
@@ -232,34 +232,37 @@ object ColumnIntervalUtil {
   }
 
   private def columnIntervalOfSinglePredicate(condition: RexNode): ValueInterval = {
-    val convertedCondition = condition.asInstanceOf[RexCall]
-    if (convertedCondition == null || convertedCondition.operands.size() != 2) {
-      null
-    } else {
-      val (literalValue, op) =
-        (convertedCondition.operands.head, convertedCondition.operands.last) match {
-          case (_: RexInputRef, literal: RexLiteral) =>
-            (getLiteralValueByBroadType(literal), convertedCondition.getKind)
-          case (rex: RexCall, literal: RexLiteral) if rex.getKind == SqlKind.AS =>
-            (getLiteralValueByBroadType(literal), convertedCondition.getKind)
-          case (literal: RexLiteral, _: RexInputRef) =>
-            (getLiteralValueByBroadType(literal), convertedCondition.getKind.reverse())
-          case (literal: RexLiteral, rex: RexCall) if rex.getKind == SqlKind.AS =>
-            (getLiteralValueByBroadType(literal), convertedCondition.getKind.reverse())
-          case _ => (null, null)
-        }
-      if (op == null || literalValue == null) {
-        null
-      } else {
-        op match {
-          case EQUALS => ValueInterval(literalValue, literalValue)
-          case LESS_THAN => ValueInterval(null, literalValue, includeUpper = false)
-          case LESS_THAN_OR_EQUAL => ValueInterval(null, literalValue)
-          case GREATER_THAN => ValueInterval(literalValue, null, includeLower = false)
-          case GREATER_THAN_OR_EQUAL => ValueInterval(literalValue, null)
-          case _ => null
+    condition match {
+      case call: RexCall =>
+        if (call.operands.size() != 2) {
+          null
+        } else {
+          val (literalValue, op) =
+            (call.operands.head, call.operands.last) match {
+              case (_: RexInputRef, literal: RexLiteral) =>
+                (getLiteralValueByBroadType(literal), call.getKind)
+              case (rex: RexCall, literal: RexLiteral) if rex.getKind == SqlKind.AS =>
+                (getLiteralValueByBroadType(literal), call.getKind)
+              case (literal: RexLiteral, _: RexInputRef) =>
+                (getLiteralValueByBroadType(literal), call.getKind.reverse())
+              case (literal: RexLiteral, rex: RexCall) if rex.getKind == SqlKind.AS =>
+                (getLiteralValueByBroadType(literal), call.getKind.reverse())
+              case _ => (null, null)
+            }
+          if (op == null || literalValue == null) {
+            null
+          } else {
+            op match {
+              case EQUALS => ValueInterval(literalValue, literalValue)
+              case LESS_THAN => ValueInterval(null, literalValue, includeUpper = false)
+              case LESS_THAN_OR_EQUAL => ValueInterval(null, literalValue)
+              case GREATER_THAN => ValueInterval(literalValue, null, includeLower = false)
+              case GREATER_THAN_OR_EQUAL => ValueInterval(literalValue, null)
+              case _ => null
+            }
+          }
         }
-      }
+      case _ => null
     }
   }
 
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
index 221361c251f..62a06e99d45 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
@@ -18,7 +18,7 @@
 package org.apache.flink.table.planner.runtime.batch.sql.join
 
 import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{DOUBLE_TYPE_INFO, INT_TYPE_INFO, LONG_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
 import org.apache.flink.api.common.typeinfo.Types
 import org.apache.flink.api.common.typeutils.TypeComparator
 import org.apache.flink.api.dag.Transformation
@@ -43,7 +43,6 @@ import org.junit.runners.Parameterized
 import java.util
 
 import scala.collection.JavaConversions._
-import scala.collection.Seq
 
 @RunWith(classOf[Parameterized])
 class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
@@ -233,6 +232,25 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
     )
   }
 
+  @Test
+  def testInnerJoinWithBooleanFilterCondition(): Unit = {
+    val data1: Seq[Row] =
+      Seq(row(1, 1L, "Hi", true), row(2, 2L, "Hello", false), row(3, 2L, "Hello world", true))
+    val type3 = new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO, BOOLEAN_TYPE_INFO)
+    registerCollection("table5", data1, type3, "a1, b1, c1, d1")
+    registerCollection("table6", data1, type3, "a2, b2, c2, d2")
+
+    checkResult(
+      "SELECT a1, a1, c2 FROM table5 INNER JOIN table6 ON d1 = d2 where d1 is true",
+      Seq(
+        row("1, 1, Hello world"),
+        row("1, 1, Hi"),
+        row("3, 3, Hello world"),
+        row("3, 3, Hi")
+      )
+    )
+  }
+
   @Test
   def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
     checkResult(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 653ed01d913..efc05be0f7c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -323,6 +323,28 @@ class JoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(sta
     assertEquals(expected.sorted, sink.getRetractResults.sorted)
   }
 
+  @Test
+  def testInnerJoinWithBooleanFilterCondition(): Unit = {
+    val data1 = new mutable.MutableList[(Int, Long, String, Boolean)]
+    data1.+=((1, 1L, "Hi", true))
+    data1.+=((2, 2L, "Hello", false))
+    data1.+=((3, 2L, "Hello world", true))
+
+    val t1 = failingDataSource(data1).toTable(tEnv, 'a1, 'b1, 'c1, 'd1)
+    val t2 = failingDataSource(data1).toTable(tEnv, 'a2, 'b2, 'c2, 'd2)
+    tEnv.registerTable("Table3", t1)
+    tEnv.registerTable("Table5", t2)
+
+    val sqlQuery = "SELECT a1, a1, c2 FROM Table3 INNER JOIN Table5 ON d1 = d2 where d1 is true"
+
+    val sink = new TestingRetractSink
+    tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
+    env.execute()
+
+    val expected = Seq("1,1,Hello world", "1,1,Hi", "3,3,Hello world", "3,3,Hi")
+    assertEquals(expected.sorted, sink.getRetractResults.sorted)
+  }
+
   @Test
   def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
     val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"