You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/06/27 12:06:11 UTC
[flink] branch release-1.15 updated: [FLINK-25097][table-planner] Fix bug in inner join when the filter condition is boolean type
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch release-1.15
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.15 by this push:
new 1e47430137f [FLINK-25097][table-planner] Fix bug in inner join when the filter condition is boolean type
1e47430137f is described below
commit 1e47430137f6db24e2e24cafc30e1227787c6720
Author: zhengyunhong.zyh <zh...@alibaba-inc.com>
AuthorDate: Thu Apr 14 17:04:41 2022 +0800
[FLINK-25097][table-planner] Fix bug in inner join when the filter condition is boolean type
This closes #19579
(cherry picked from commit 686634aa05aadc33cc7b655d25ad162fe59e41fd)
---
.../planner/plan/utils/ColumnIntervalUtil.scala | 57 ++++++++++++----------
.../runtime/batch/sql/join/JoinITCase.scala | 22 ++++++++-
.../planner/runtime/stream/sql/JoinITCase.scala | 22 +++++++++
3 files changed, 72 insertions(+), 29 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
index 87bc3aa637f..fb1af546426 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/ColumnIntervalUtil.scala
@@ -232,34 +232,37 @@ object ColumnIntervalUtil {
}
private def columnIntervalOfSinglePredicate(condition: RexNode): ValueInterval = {
- val convertedCondition = condition.asInstanceOf[RexCall]
- if (convertedCondition == null || convertedCondition.operands.size() != 2) {
- null
- } else {
- val (literalValue, op) =
- (convertedCondition.operands.head, convertedCondition.operands.last) match {
- case (_: RexInputRef, literal: RexLiteral) =>
- (getLiteralValueByBroadType(literal), convertedCondition.getKind)
- case (rex: RexCall, literal: RexLiteral) if rex.getKind == SqlKind.AS =>
- (getLiteralValueByBroadType(literal), convertedCondition.getKind)
- case (literal: RexLiteral, _: RexInputRef) =>
- (getLiteralValueByBroadType(literal), convertedCondition.getKind.reverse())
- case (literal: RexLiteral, rex: RexCall) if rex.getKind == SqlKind.AS =>
- (getLiteralValueByBroadType(literal), convertedCondition.getKind.reverse())
- case _ => (null, null)
- }
- if (op == null || literalValue == null) {
- null
- } else {
- op match {
- case EQUALS => ValueInterval(literalValue, literalValue)
- case LESS_THAN => ValueInterval(null, literalValue, includeUpper = false)
- case LESS_THAN_OR_EQUAL => ValueInterval(null, literalValue)
- case GREATER_THAN => ValueInterval(literalValue, null, includeLower = false)
- case GREATER_THAN_OR_EQUAL => ValueInterval(literalValue, null)
- case _ => null
+ condition match {
+ case call: RexCall =>
+ if (call.operands.size() != 2) {
+ null
+ } else {
+ val (literalValue, op) =
+ (call.operands.head, call.operands.last) match {
+ case (_: RexInputRef, literal: RexLiteral) =>
+ (getLiteralValueByBroadType(literal), call.getKind)
+ case (rex: RexCall, literal: RexLiteral) if rex.getKind == SqlKind.AS =>
+ (getLiteralValueByBroadType(literal), call.getKind)
+ case (literal: RexLiteral, _: RexInputRef) =>
+ (getLiteralValueByBroadType(literal), call.getKind.reverse())
+ case (literal: RexLiteral, rex: RexCall) if rex.getKind == SqlKind.AS =>
+ (getLiteralValueByBroadType(literal), call.getKind.reverse())
+ case _ => (null, null)
+ }
+ if (op == null || literalValue == null) {
+ null
+ } else {
+ op match {
+ case EQUALS => ValueInterval(literalValue, literalValue)
+ case LESS_THAN => ValueInterval(null, literalValue, includeUpper = false)
+ case LESS_THAN_OR_EQUAL => ValueInterval(null, literalValue)
+ case GREATER_THAN => ValueInterval(literalValue, null, includeLower = false)
+ case GREATER_THAN_OR_EQUAL => ValueInterval(literalValue, null)
+ case _ => null
+ }
+ }
}
- }
+ case _ => null
}
}
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
index 221361c251f..62a06e99d45 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/join/JoinITCase.scala
@@ -18,7 +18,7 @@
package org.apache.flink.table.planner.runtime.batch.sql.join
import org.apache.flink.api.common.ExecutionConfig
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo.{DOUBLE_TYPE_INFO, INT_TYPE_INFO, LONG_TYPE_INFO}
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo._
import org.apache.flink.api.common.typeinfo.Types
import org.apache.flink.api.common.typeutils.TypeComparator
import org.apache.flink.api.dag.Transformation
@@ -43,7 +43,6 @@ import org.junit.runners.Parameterized
import java.util
import scala.collection.JavaConversions._
-import scala.collection.Seq
@RunWith(classOf[Parameterized])
class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
@@ -233,6 +232,25 @@ class JoinITCase(expectedJoinType: JoinType) extends BatchTestBase {
)
}
+ @Test
+ def testInnerJoinWithBooleanFilterCondition(): Unit = {
+ val data1: Seq[Row] =
+ Seq(row(1, 1L, "Hi", true), row(2, 2L, "Hello", false), row(3, 2L, "Hello world", true))
+ val type3 = new RowTypeInfo(INT_TYPE_INFO, LONG_TYPE_INFO, STRING_TYPE_INFO, BOOLEAN_TYPE_INFO)
+ registerCollection("table5", data1, type3, "a1, b1, c1, d1")
+ registerCollection("table6", data1, type3, "a2, b2, c2, d2")
+
+ checkResult(
+ "SELECT a1, a1, c2 FROM table5 INNER JOIN table6 ON d1 = d2 where d1 is true",
+ Seq(
+ row("1, 1, Hello world"),
+ row("1, 1, Hi"),
+ row("3, 3, Hello world"),
+ row("3, 3, Hi")
+ )
+ )
+ }
+
@Test
def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
checkResult(
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 653ed01d913..efc05be0f7c 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -323,6 +323,28 @@ class JoinITCase(state: StateBackendMode) extends StreamingWithStateTestBase(sta
assertEquals(expected.sorted, sink.getRetractResults.sorted)
}
+ @Test
+ def testInnerJoinWithBooleanFilterCondition(): Unit = {
+ val data1 = new mutable.MutableList[(Int, Long, String, Boolean)]
+ data1.+=((1, 1L, "Hi", true))
+ data1.+=((2, 2L, "Hello", false))
+ data1.+=((3, 2L, "Hello world", true))
+
+ val t1 = failingDataSource(data1).toTable(tEnv, 'a1, 'b1, 'c1, 'd1)
+ val t2 = failingDataSource(data1).toTable(tEnv, 'a2, 'b2, 'c2, 'd2)
+ tEnv.registerTable("Table3", t1)
+ tEnv.registerTable("Table5", t2)
+
+ val sqlQuery = "SELECT a1, a1, c2 FROM Table3 INNER JOIN Table5 ON d1 = d2 where d1 is true"
+
+ val sink = new TestingRetractSink
+ tEnv.sqlQuery(sqlQuery).toRetractStream[Row].addSink(sink).setParallelism(1)
+ env.execute()
+
+ val expected = Seq("1,1,Hello world", "1,1,Hi", "3,3,Hello world", "3,3,Hi")
+ assertEquals(expected.sorted, sink.getRetractResults.sorted)
+ }
+
@Test
def testInnerJoinWithNonEquiJoinPredicate(): Unit = {
val sqlQuery = "SELECT c, g FROM Table3, Table5 WHERE b = e AND a < 6 AND h < b"