You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2021/11/19 01:44:51 UTC

[flink] branch master updated: [FLINK-24835][table-planner] Fix bug in `RelTimeIndicatorConverter` when materialize time attribute fields of regular join's inputs

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aa74d5  [FLINK-24835][table-planner] Fix bug in `RelTimeIndicatorConverter` when materialize time attribute fields of regular join's inputs
8aa74d5 is described below

commit 8aa74d5ad7026734bdd98eabbc9cbbb243bbe8b0
Author: Jing <be...@126.com>
AuthorDate: Tue Nov 9 17:53:18 2021 +0800

    [FLINK-24835][table-planner] Fix bug in `RelTimeIndicatorConverter` when materialize time attribute fields of regular join's inputs
    
    This closes #17733
---
 .../planner/calcite/RelTimeIndicatorConverter.java |   4 +-
 .../physical/stream/StreamPhysicalJoinRule.scala   |   2 +-
 .../planner/plan/utils/IntervalJoinUtil.scala      |  17 ++-
 .../flink/table/planner/plan/utils/JoinUtil.scala  |  25 ++--
 .../planner/plan/utils/TemporalJoinUtil.scala      |   7 +-
 .../table/planner/plan/utils/WindowJoinUtil.scala  |  18 ++-
 .../plan/stream/sql/join/IntervalJoinTest.xml      | 130 +++++++++++++++++----
 .../plan/stream/sql/join/IntervalJoinTest.scala    |  15 +++
 .../operators/join/window/WindowJoinOperator.java  |   6 +-
 9 files changed, 173 insertions(+), 51 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
index 52c8d7f..62be565 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
@@ -265,7 +265,7 @@ public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle {
         int leftFieldCount = newLeft.getRowType().getFieldCount();
 
         // temporal table join
-        if (TemporalJoinUtil.satisfyTemporalJoin(join)) {
+        if (TemporalJoinUtil.satisfyTemporalJoin(join, newLeft, newRight)) {
             RelNode rewrittenTemporalJoin =
                     join.copy(
                             join.getTraitSet(),
@@ -282,7 +282,7 @@ public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle {
                             .collect(Collectors.toSet());
             return createCalcToMaterializeTimeIndicators(rewrittenTemporalJoin, rightIndices);
         } else {
-            if (JoinUtil.satisfyRegularJoin(join, join.getRight())) {
+            if (JoinUtil.satisfyRegularJoin(join, newLeft, newRight)) {
                 // materialize time attribute fields of regular join's inputs
                 newLeft = materializeTimeIndicators(newLeft);
                 newRight = materializeTimeIndicators(newRight);
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
index a525ae9..9e217e9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
@@ -44,7 +44,7 @@ class StreamPhysicalJoinRule
     val left: FlinkLogicalRel = call.rel(1).asInstanceOf[FlinkLogicalRel]
     val right: FlinkLogicalRel = call.rel(2).asInstanceOf[FlinkLogicalRel]
 
-    if (!satisfyRegularJoin(join, right)) {
+    if (!satisfyRegularJoin(join, left, right)) {
       return false
     }
 
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
index cdef2687..d354b65 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
@@ -29,6 +29,8 @@ import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
 import org.apache.calcite.plan.RelOptUtil
 import org.apache.calcite.sql.`type`.SqlTypeName
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.validate.SqlValidatorUtil
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rel.`type`.RelDataType
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.SqlKind
@@ -450,15 +452,26 @@ object IntervalJoinUtil {
    *         else false.
    */
   def satisfyIntervalJoin(join: FlinkLogicalJoin): Boolean = {
+    satisfyIntervalJoin(join, join.getLeft, join.getRight)
+  }
+
+  def satisfyIntervalJoin(join: FlinkLogicalJoin, newLeft: RelNode, newRight: RelNode): Boolean = {
     // TODO support SEMI/ANTI joinSplitAggregateRuleTest
     if (!join.getJoinType.projectsRight) {
       return false
     }
+    val newJoinRowType = SqlValidatorUtil.deriveJoinRowType(
+      newLeft.getRowType,
+      newRight.getRowType,
+      join.getJoinType,
+      join.getCluster.getTypeFactory,
+      null,
+      join.getSystemFieldList)
     val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(join)
     val (windowBounds, _) = extractWindowBoundsFromPredicate(
       join.getCondition,
-      join.getLeft.getRowType.getFieldCount,
-      join.getRowType,
+      newLeft.getRowType.getFieldCount,
+      newJoinRowType,
       join.getCluster.getRexBuilder,
       tableConfig)
     windowBounds.nonEmpty
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
index ff21ea9..cf9a3a3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
@@ -237,34 +237,23 @@ object JoinUtil {
    * Check whether input join node satisfy preconditions to convert into regular join.
    *
    * @param join input join to analyze.
+   * @param newLeft new left child of join
+   * @param newRight new right child of join
    *
    * @return True if input join node satisfy preconditions to convert into regular join,
    *         else false.
    */
-  def satisfyRegularJoin(join: FlinkLogicalJoin): Boolean = {
-    satisfyRegularJoin(join, join.getRight)
-  }
-
-  /**
-   * Check whether input join node satisfy preconditions to convert into regular join.
-   *
-   * @param join input join to analyze.
-   * @param right right child of input join
-   *
-   * @return True if input join node satisfy preconditions to convert into regular join,
-   *         else false.
-   */
-  def satisfyRegularJoin(join: FlinkLogicalJoin, right: RelNode): Boolean = {
-    if (right.isInstanceOf[FlinkLogicalSnapshot]) {
+  def satisfyRegularJoin(join: FlinkLogicalJoin, newLeft: RelNode, newRight: RelNode): Boolean = {
+    if (newRight.isInstanceOf[FlinkLogicalSnapshot]) {
       // exclude lookup join
       false
-    } else if (satisfyTemporalJoin(join)) {
+    } else if (satisfyTemporalJoin(join, newLeft, newRight)) {
       // exclude temporal table join
       false
-    } else if (satisfyIntervalJoin(join)) {
+    } else if (satisfyIntervalJoin(join, newLeft, newRight)) {
       // exclude interval join
       false
-    } else if (satisfyWindowJoin(join)) {
+    } else if (satisfyWindowJoin(join, newLeft, newRight)) {
       // exclude window join
       false
     } else {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
index 63d0d3e..1fe967f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec
 import org.apache.flink.util.Preconditions.checkState
 
 import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rex._
 import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
 import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
@@ -424,10 +425,14 @@ object TemporalJoinUtil {
    *         else false.
    */
   def satisfyTemporalJoin(join: FlinkLogicalJoin): Boolean = {
+    satisfyTemporalJoin(join, join.getLeft, join.getRight)
+  }
+
+  def satisfyTemporalJoin(join: FlinkLogicalJoin, newLeft: RelNode, newRight: RelNode): Boolean = {
     if (!containsTemporalJoinCondition(join.getCondition)) {
       return false
     }
-    val joinInfo = JoinInfo.of(join.getLeft, join.getRight, join.getCondition)
+    val joinInfo = JoinInfo.of(newLeft, newRight, join.getCondition)
     if (isTemporalFunctionJoin(join.getCluster.getRexBuilder, joinInfo)) {
       // Temporal table function join currently only support INNER JOIN
       join.getJoinType match {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
index f155761..3284dc6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
 import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin
 import org.apache.flink.table.planner.utils.Logging
 
+import org.apache.calcite.rel.RelNode
 import org.apache.calcite.rex.{RexInputRef, RexNode, RexUtil}
 import org.apache.calcite.sql.fun.SqlStdOperatorTable
 
@@ -60,7 +61,11 @@ object WindowJoinUtil extends Logging {
    *         ends equality of input tables, else false.
    */
   def satisfyWindowJoin(join: FlinkLogicalJoin): Boolean = {
-    excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join) match {
+    satisfyWindowJoin(join, join.getLeft, join.getRight)
+  }
+
+  def satisfyWindowJoin(join: FlinkLogicalJoin, newLeft: RelNode, newRight: RelNode): Boolean = {
+    excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join, newLeft, newRight) match {
       case Some((windowStartEqualityLeftKeys, windowEndEqualityLeftKeys, _, _)) =>
         windowStartEqualityLeftKeys.nonEmpty && windowEndEqualityLeftKeys.nonEmpty
       case _ => false
@@ -166,6 +171,13 @@ object WindowJoinUtil extends Logging {
    */
   private def excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(
       join: FlinkLogicalJoin): Option[(Array[Int], Array[Int], Array[Int], Array[Int])] = {
+    excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join, join.getLeft, join.getRight)
+  }
+
+  private def excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(
+      join: FlinkLogicalJoin,
+      newLeft: RelNode,
+      newRight: RelNode): Option[(Array[Int], Array[Int], Array[Int], Array[Int])] = {
     val joinInfo = join.analyzeCondition()
     val (leftWindowProperties, rightWindowProperties) = getChildWindowProperties(join)
 
@@ -222,8 +234,8 @@ object WindowJoinUtil extends Logging {
         )
       }
     } else if (windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty) {
-      val leftFieldNames = join.getLeft.getRowType.getFieldNames.toList
-      val rightFieldNames = join.getRight.getRowType.getFieldNames.toList
+      val leftFieldNames = newLeft.getRowType.getFieldNames.toList
+      val rightFieldNames = newRight.getRowType.getFieldNames.toList
       val inputFieldNames = leftFieldNames ++ rightFieldNames
       val condition = join.getExpressionString(
         join.getCondition,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
index 331c8bb..2499c26 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
@@ -16,6 +16,51 @@ See the License for the specific language governing permissions and
 limitations under the License.
 -->
 <Root>
+  <TestCase name="testFallbackToRegularJoin">
+		<Resource name="sql">
+			<![CDATA[
+SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
+ SELECT t2.a FROM MyTable2 t2
+   WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime + INTERVAL '5' MINUTE
+   GROUP BY t2.a
+)
+    ]]>
+		</Resource>
+		<Resource name="ast">
+			<![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[IN($0, {
+LogicalAggregate(group=[{0}])
+  LogicalProject(a=[$0])
+    LogicalFilter(condition=[AND(=($cor0.b, $1), >=($cor0.rowtime, $4), <=($cor0.rowtime, +($4, 300000:INTERVAL MINUTE)))])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+		</Resource>
+		<Resource name="optimized exec plan">
+			<![CDATA[
+Calc(select=[a])
++- Join(joinType=[InnerJoin], where=[((b = b0) AND (rowtime = rowtime0) AND (a = a0))], select=[a, b, rowtime, a0, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+   :- Exchange(distribution=[hash[b, rowtime, a]])
+   :  +- Calc(select=[a, b, CAST(rowtime) AS rowtime])
+   :     +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])(reuse_id=[1])
+   +- Exchange(distribution=[hash[b0, rowtime0, a]])
+      +- GroupAggregate(groupBy=[a, b0, rowtime0], select=[a, b0, rowtime0])
+         +- Exchange(distribution=[hash[a, b0, rowtime0]])
+            +- Calc(select=[a, b0, rowtime0])
+               +- Join(joinType=[InnerJoin], where=[((b0 = b) AND (rowtime0 >= rowtime) AND (rowtime0 <= (rowtime + 300000:INTERVAL MINUTE)))], select=[a, b, rowtime, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+                  :- Exchange(distribution=[hash[b]])
+                  :  +- Calc(select=[a, b, CAST(rowtime) AS rowtime])
+                  :     +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                  +- Exchange(distribution=[hash[b]])
+                     +- GroupAggregate(groupBy=[b, rowtime], select=[b, rowtime])
+                        +- Exchange(distribution=[hash[b, rowtime]])
+                           +- Calc(select=[b, CAST(rowtime) AS rowtime])
+                              +- Reused(reference_id=[1])
+]]>
+		</Resource>
+	</TestCase>
   <TestCase name="testInteravalDiffTimeIndicator">
     <Resource name="sql">
       <![CDATA[
@@ -595,27 +640,27 @@ Calc(select=[a, b])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRowTimeRightOuterJoin">
+  <TestCase name="testRowTimeInnerJoinWithWhereClause">
     <Resource name="sql">
       <![CDATA[
-SELECT t1.a, t2.b
-FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON
+SELECT t1.a, t2.b FROM MyTable t1, MyTable2 t2 WHERE
   t1.a = t2.a AND
-  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR
+  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' MINUTE AND t2.rowtime + INTERVAL '1' HOUR
       ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$6])
-+- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 10000:INTERVAL SECOND)), <=($4, +($9, 3600000:INTERVAL HOUR)))], joinType=[right])
-   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
++- LogicalFilter(condition=[AND(=($0, $5), >=($4, -($9, 600000:INTERVAL MINUTE)), <=($4, +($9, 3600000:INTERVAL HOUR)))])
+   +- LogicalJoin(condition=[true], joinType=[inner])
+      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- IntervalJoin(joinType=[RightOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a, rowtime])
    :     +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -625,27 +670,27 @@ Calc(select=[a, b])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRowTimeInnerJoinWithWhereClause">
+  <TestCase name="testRowTimeLeftOuterJoin">
     <Resource name="sql">
       <![CDATA[
-SELECT t1.a, t2.b FROM MyTable t1, MyTable2 t2 WHERE
+SELECT t1.a, t2.b
+FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
   t1.a = t2.a AND
-  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' MINUTE AND t2.rowtime + INTERVAL '1' HOUR
+  t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR
       ]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$6])
-+- LogicalFilter(condition=[AND(=($0, $5), >=($4, -($9, 600000:INTERVAL MINUTE)), <=($4, +($9, 3600000:INTERVAL HOUR)))])
-   +- LogicalJoin(condition=[true], joinType=[inner])
-      :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
-      +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
++- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 10000:INTERVAL SECOND)), <=($4, +($9, 3600000:INTERVAL HOUR)))], joinType=[left])
+   :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
 ]]>
     </Resource>
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
++- IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a, rowtime])
    :     +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -655,11 +700,11 @@ Calc(select=[a, b])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRowTimeLeftOuterJoin">
+  <TestCase name="testRowTimeRightOuterJoin">
     <Resource name="sql">
       <![CDATA[
 SELECT t1.a, t2.b
-FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
+FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON
   t1.a = t2.a AND
   t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR
       ]]>
@@ -667,7 +712,7 @@ FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a=[$0], b=[$6])
-+- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 10000:INTERVAL SECOND)), <=($4, +($9, 3600000:INTERVAL HOUR)))], joinType=[left])
++- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 10000:INTERVAL SECOND)), <=($4, +($9, 3600000:INTERVAL HOUR)))], joinType=[right])
    :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
    +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
 ]]>
@@ -675,7 +720,7 @@ LogicalProject(a=[$0], b=[$6])
     <Resource name="optimized exec plan">
       <![CDATA[
 Calc(select=[a, b])
-+- IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
++- IntervalJoin(joinType=[RightOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
    :- Exchange(distribution=[hash[a]])
    :  +- Calc(select=[a, rowtime])
    :     +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -685,4 +730,49 @@ Calc(select=[a, b])
 ]]>
     </Resource>
   </TestCase>
+  <TestCase name="testSemiIntervalJoinWithSimpleConditionAndGroup">
+    <Resource name="sql">
+      <![CDATA[
+SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
+ SELECT t2.a FROM MyTable2 t2
+   WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime + INTERVAL '5' MINUTE
+   GROUP BY t2.a
+)
+    ]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[IN($0, {
+LogicalAggregate(group=[{0}])
+  LogicalProject(a=[$0])
+    LogicalFilter(condition=[AND(=($cor0.b, $1), >=($cor0.rowtime, $4), <=($cor0.rowtime, +($4, 300000:INTERVAL MINUTE)))])
+      LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+    </Resource>
+    <Resource name="optimized exec plan">
+      <![CDATA[
+Calc(select=[a])
++- Join(joinType=[InnerJoin], where=[((b = b0) AND (rowtime = rowtime0) AND (a = a0))], select=[a, b, rowtime, a0, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+   :- Exchange(distribution=[hash[b, rowtime, a]])
+   :  +- Calc(select=[a, b, CAST(rowtime) AS rowtime])
+   :     +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])(reuse_id=[1])
+   +- Exchange(distribution=[hash[b0, rowtime0, a]])
+      +- GroupAggregate(groupBy=[a, b0, rowtime0], select=[a, b0, rowtime0])
+         +- Exchange(distribution=[hash[a, b0, rowtime0]])
+            +- Calc(select=[a, b0, rowtime0])
+               +- Join(joinType=[InnerJoin], where=[((b0 = b) AND (rowtime0 >= rowtime) AND (rowtime0 <= (rowtime + 300000:INTERVAL MINUTE)))], select=[a, b, rowtime, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+                  :- Exchange(distribution=[hash[b]])
+                  :  +- Calc(select=[a, b, CAST(rowtime) AS rowtime])
+                  :     +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+                  +- Exchange(distribution=[hash[b]])
+                     +- GroupAggregate(groupBy=[b, rowtime], select=[b, rowtime])
+                        +- Exchange(distribution=[hash[b, rowtime]])
+                           +- Calc(select=[b, CAST(rowtime) AS rowtime])
+                              +- Reused(reference_id=[1])
+]]>
+    </Resource>
+  </TestCase>
 </Root>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
index 7dc82f3..b57767b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
@@ -493,6 +493,21 @@ class IntervalJoinTest extends TableTestBase {
       ">($2, $6)")
   }
 
+  @Test
+  def testFallbackToRegularJoin(): Unit = {
+    // the following query would translated into regular join instead of interval join because the
+    // time attribute of right side would be materialized.
+    val sql =
+      """
+        |SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
+        | SELECT t2.a FROM MyTable2 t2
+        |   WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime + INTERVAL '5' MINUTE
+        |   GROUP BY t2.a
+        |)
+    """.stripMargin
+    util.verifyExecPlan(sql)
+  }
+
   private def verifyTimeBoundary(
       timeConditionSql: String,
       expLeftSize: Long,
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
index 66a5de2..9acfdd8 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
@@ -299,10 +299,8 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
                 boolean matches = false;
                 for (RowData rightRecord : rightRecords) {
                     if (joinCondition.apply(leftRecord, rightRecord)) {
-                        if (joinCondition.apply(leftRecord, rightRecord)) {
-                            matches = true;
-                            break;
-                        }
+                        matches = true;
+                        break;
                     }
                 }
                 if (matches) {