You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2021/11/19 01:44:51 UTC
[flink] branch master updated: [FLINK-24835][table-planner] Fix bug in `RelTimeIndicatorConverter` when materialize time attribute fields of regular join's inputs
This is an automated email from the ASF dual-hosted git repository.
godfrey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 8aa74d5 [FLINK-24835][table-planner] Fix bug in `RelTimeIndicatorConverter` when materialize time attribute fields of regular join's inputs
8aa74d5 is described below
commit 8aa74d5ad7026734bdd98eabbc9cbbb243bbe8b0
Author: Jing <be...@126.com>
AuthorDate: Tue Nov 9 17:53:18 2021 +0800
[FLINK-24835][table-planner] Fix bug in `RelTimeIndicatorConverter` when materialize time attribute fields of regular join's inputs
This closes #17733
---
.../planner/calcite/RelTimeIndicatorConverter.java | 4 +-
.../physical/stream/StreamPhysicalJoinRule.scala | 2 +-
.../planner/plan/utils/IntervalJoinUtil.scala | 17 ++-
.../flink/table/planner/plan/utils/JoinUtil.scala | 25 ++--
.../planner/plan/utils/TemporalJoinUtil.scala | 7 +-
.../table/planner/plan/utils/WindowJoinUtil.scala | 18 ++-
.../plan/stream/sql/join/IntervalJoinTest.xml | 130 +++++++++++++++++----
.../plan/stream/sql/join/IntervalJoinTest.scala | 15 +++
.../operators/join/window/WindowJoinOperator.java | 6 +-
9 files changed, 173 insertions(+), 51 deletions(-)
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
index 52c8d7f..62be565 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/calcite/RelTimeIndicatorConverter.java
@@ -265,7 +265,7 @@ public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle {
int leftFieldCount = newLeft.getRowType().getFieldCount();
// temporal table join
- if (TemporalJoinUtil.satisfyTemporalJoin(join)) {
+ if (TemporalJoinUtil.satisfyTemporalJoin(join, newLeft, newRight)) {
RelNode rewrittenTemporalJoin =
join.copy(
join.getTraitSet(),
@@ -282,7 +282,7 @@ public final class RelTimeIndicatorConverter extends RelHomogeneousShuttle {
.collect(Collectors.toSet());
return createCalcToMaterializeTimeIndicators(rewrittenTemporalJoin, rightIndices);
} else {
- if (JoinUtil.satisfyRegularJoin(join, join.getRight())) {
+ if (JoinUtil.satisfyRegularJoin(join, newLeft, newRight)) {
// materialize time attribute fields of regular join's inputs
newLeft = materializeTimeIndicators(newLeft);
newRight = materializeTimeIndicators(newRight);
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
index a525ae9..9e217e9 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/StreamPhysicalJoinRule.scala
@@ -44,7 +44,7 @@ class StreamPhysicalJoinRule
val left: FlinkLogicalRel = call.rel(1).asInstanceOf[FlinkLogicalRel]
val right: FlinkLogicalRel = call.rel(2).asInstanceOf[FlinkLogicalRel]
- if (!satisfyRegularJoin(join, right)) {
+ if (!satisfyRegularJoin(join, left, right)) {
return false
}
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
index cdef2687..d354b65 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/IntervalJoinUtil.scala
@@ -29,6 +29,8 @@ import org.apache.flink.table.planner.plan.schema.TimeIndicatorRelDataType
import org.apache.calcite.plan.RelOptUtil
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.sql.fun.SqlStdOperatorTable
+import org.apache.calcite.sql.validate.SqlValidatorUtil
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.rel.`type`.RelDataType
import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlKind
@@ -450,15 +452,26 @@ object IntervalJoinUtil {
* else false.
*/
def satisfyIntervalJoin(join: FlinkLogicalJoin): Boolean = {
+ satisfyIntervalJoin(join, join.getLeft, join.getRight)
+ }
+
+ def satisfyIntervalJoin(join: FlinkLogicalJoin, newLeft: RelNode, newRight: RelNode): Boolean = {
// TODO support SEMI/ANTI joinSplitAggregateRuleTest
if (!join.getJoinType.projectsRight) {
return false
}
+ val newJoinRowType = SqlValidatorUtil.deriveJoinRowType(
+ newLeft.getRowType,
+ newRight.getRowType,
+ join.getJoinType,
+ join.getCluster.getTypeFactory,
+ null,
+ join.getSystemFieldList)
val tableConfig = FlinkRelOptUtil.getTableConfigFromContext(join)
val (windowBounds, _) = extractWindowBoundsFromPredicate(
join.getCondition,
- join.getLeft.getRowType.getFieldCount,
- join.getRowType,
+ newLeft.getRowType.getFieldCount,
+ newJoinRowType,
join.getCluster.getRexBuilder,
tableConfig)
windowBounds.nonEmpty
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
index ff21ea9..cf9a3a3 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/JoinUtil.scala
@@ -237,34 +237,23 @@ object JoinUtil {
* Check whether input join node satisfy preconditions to convert into regular join.
*
* @param join input join to analyze.
+ * @param newLeft new left child of join
+ * @param newRight new right child of join
*
* @return True if input join node satisfy preconditions to convert into regular join,
* else false.
*/
- def satisfyRegularJoin(join: FlinkLogicalJoin): Boolean = {
- satisfyRegularJoin(join, join.getRight)
- }
-
- /**
- * Check whether input join node satisfy preconditions to convert into regular join.
- *
- * @param join input join to analyze.
- * @param right right child of input join
- *
- * @return True if input join node satisfy preconditions to convert into regular join,
- * else false.
- */
- def satisfyRegularJoin(join: FlinkLogicalJoin, right: RelNode): Boolean = {
- if (right.isInstanceOf[FlinkLogicalSnapshot]) {
+ def satisfyRegularJoin(join: FlinkLogicalJoin, newLeft: RelNode, newRight: RelNode): Boolean = {
+ if (newRight.isInstanceOf[FlinkLogicalSnapshot]) {
// exclude lookup join
false
- } else if (satisfyTemporalJoin(join)) {
+ } else if (satisfyTemporalJoin(join, newLeft, newRight)) {
// exclude temporal table join
false
- } else if (satisfyIntervalJoin(join)) {
+ } else if (satisfyIntervalJoin(join, newLeft, newRight)) {
// exclude interval join
false
- } else if (satisfyWindowJoin(join)) {
+ } else if (satisfyWindowJoin(join, newLeft, newRight)) {
// exclude window join
false
} else {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
index 63d0d3e..1fe967f 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/TemporalJoinUtil.scala
@@ -24,6 +24,7 @@ import org.apache.flink.table.planner.plan.nodes.exec.spec.JoinSpec
import org.apache.flink.util.Preconditions.checkState
import org.apache.calcite.rel.core.{JoinInfo, JoinRelType}
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex._
import org.apache.calcite.sql.`type`.{OperandTypes, ReturnTypes}
import org.apache.calcite.sql.{SqlFunction, SqlFunctionCategory, SqlKind}
@@ -424,10 +425,14 @@ object TemporalJoinUtil {
* else false.
*/
def satisfyTemporalJoin(join: FlinkLogicalJoin): Boolean = {
+ satisfyTemporalJoin(join, join.getLeft, join.getRight)
+ }
+
+ def satisfyTemporalJoin(join: FlinkLogicalJoin, newLeft: RelNode, newRight: RelNode): Boolean = {
if (!containsTemporalJoinCondition(join.getCondition)) {
return false
}
- val joinInfo = JoinInfo.of(join.getLeft, join.getRight, join.getCondition)
+ val joinInfo = JoinInfo.of(newLeft, newRight, join.getCondition)
if (isTemporalFunctionJoin(join.getCluster.getRexBuilder, joinInfo)) {
// Temporal table function join currently only support INNER JOIN
join.getJoinType match {
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
index f155761..3284dc6 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/WindowJoinUtil.scala
@@ -23,6 +23,7 @@ import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery
import org.apache.flink.table.planner.plan.nodes.logical.FlinkLogicalJoin
import org.apache.flink.table.planner.utils.Logging
+import org.apache.calcite.rel.RelNode
import org.apache.calcite.rex.{RexInputRef, RexNode, RexUtil}
import org.apache.calcite.sql.fun.SqlStdOperatorTable
@@ -60,7 +61,11 @@ object WindowJoinUtil extends Logging {
* ends equality of input tables, else false.
*/
def satisfyWindowJoin(join: FlinkLogicalJoin): Boolean = {
- excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join) match {
+ satisfyWindowJoin(join, join.getLeft, join.getRight)
+ }
+
+ def satisfyWindowJoin(join: FlinkLogicalJoin, newLeft: RelNode, newRight: RelNode): Boolean = {
+ excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join, newLeft, newRight) match {
case Some((windowStartEqualityLeftKeys, windowEndEqualityLeftKeys, _, _)) =>
windowStartEqualityLeftKeys.nonEmpty && windowEndEqualityLeftKeys.nonEmpty
case _ => false
@@ -166,6 +171,13 @@ object WindowJoinUtil extends Logging {
*/
private def excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(
join: FlinkLogicalJoin): Option[(Array[Int], Array[Int], Array[Int], Array[Int])] = {
+ excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join, join.getLeft, join.getRight)
+ }
+
+ private def excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(
+ join: FlinkLogicalJoin,
+ newLeft: RelNode,
+ newRight: RelNode): Option[(Array[Int], Array[Int], Array[Int], Array[Int])] = {
val joinInfo = join.analyzeCondition()
val (leftWindowProperties, rightWindowProperties) = getChildWindowProperties(join)
@@ -222,8 +234,8 @@ object WindowJoinUtil extends Logging {
)
}
} else if (windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty) {
- val leftFieldNames = join.getLeft.getRowType.getFieldNames.toList
- val rightFieldNames = join.getRight.getRowType.getFieldNames.toList
+ val leftFieldNames = newLeft.getRowType.getFieldNames.toList
+ val rightFieldNames = newRight.getRowType.getFieldNames.toList
val inputFieldNames = leftFieldNames ++ rightFieldNames
val condition = join.getExpressionString(
join.getCondition,
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
index 331c8bb..2499c26 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.xml
@@ -16,6 +16,51 @@ See the License for the specific language governing permissions and
limitations under the License.
-->
<Root>
+ <TestCase name="testFallbackToRegularJoin">
+ <Resource name="sql">
+ <![CDATA[
+SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
+ SELECT t2.a FROM MyTable2 t2
+ WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime + INTERVAL '5' MINUTE
+ GROUP BY t2.a
+)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[IN($0, {
+LogicalAggregate(group=[{0}])
+ LogicalProject(a=[$0])
+ LogicalFilter(condition=[AND(=($cor0.b, $1), >=($cor0.rowtime, $4), <=($cor0.rowtime, +($4, 300000:INTERVAL MINUTE)))])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a])
++- Join(joinType=[InnerJoin], where=[((b = b0) AND (rowtime = rowtime0) AND (a = a0))], select=[a, b, rowtime, a0, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+ :- Exchange(distribution=[hash[b, rowtime, a]])
+ : +- Calc(select=[a, b, CAST(rowtime) AS rowtime])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])(reuse_id=[1])
+ +- Exchange(distribution=[hash[b0, rowtime0, a]])
+ +- GroupAggregate(groupBy=[a, b0, rowtime0], select=[a, b0, rowtime0])
+ +- Exchange(distribution=[hash[a, b0, rowtime0]])
+ +- Calc(select=[a, b0, rowtime0])
+ +- Join(joinType=[InnerJoin], where=[((b0 = b) AND (rowtime0 >= rowtime) AND (rowtime0 <= (rowtime + 300000:INTERVAL MINUTE)))], select=[a, b, rowtime, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+ :- Exchange(distribution=[hash[b]])
+ : +- Calc(select=[a, b, CAST(rowtime) AS rowtime])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[hash[b]])
+ +- GroupAggregate(groupBy=[b, rowtime], select=[b, rowtime])
+ +- Exchange(distribution=[hash[b, rowtime]])
+ +- Calc(select=[b, CAST(rowtime) AS rowtime])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
<TestCase name="testInteravalDiffTimeIndicator">
<Resource name="sql">
<![CDATA[
@@ -595,27 +640,27 @@ Calc(select=[a, b])
]]>
</Resource>
</TestCase>
- <TestCase name="testRowTimeRightOuterJoin">
+ <TestCase name="testRowTimeInnerJoinWithWhereClause">
<Resource name="sql">
<![CDATA[
-SELECT t1.a, t2.b
-FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON
+SELECT t1.a, t2.b FROM MyTable t1, MyTable2 t2 WHERE
t1.a = t2.a AND
- t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR
+ t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' MINUTE AND t2.rowtime + INTERVAL '1' HOUR
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$6])
-+- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 10000:INTERVAL SECOND)), <=($4, +($9, 3600000:INTERVAL HOUR)))], joinType=[right])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
++- LogicalFilter(condition=[AND(=($0, $5), >=($4, -($9, 600000:INTERVAL MINUTE)), <=($4, +($9, 3600000:INTERVAL HOUR)))])
+ +- LogicalJoin(condition=[true], joinType=[inner])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- IntervalJoin(joinType=[RightOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
++- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, rowtime])
: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -625,27 +670,27 @@ Calc(select=[a, b])
]]>
</Resource>
</TestCase>
- <TestCase name="testRowTimeInnerJoinWithWhereClause">
+ <TestCase name="testRowTimeLeftOuterJoin">
<Resource name="sql">
<![CDATA[
-SELECT t1.a, t2.b FROM MyTable t1, MyTable2 t2 WHERE
+SELECT t1.a, t2.b
+FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
t1.a = t2.a AND
- t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' MINUTE AND t2.rowtime + INTERVAL '1' HOUR
+ t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$6])
-+- LogicalFilter(condition=[AND(=($0, $5), >=($4, -($9, 600000:INTERVAL MINUTE)), <=($4, +($9, 3600000:INTERVAL HOUR)))])
- +- LogicalJoin(condition=[true], joinType=[inner])
- :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
- +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
++- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 10000:INTERVAL SECOND)), <=($4, +($9, 3600000:INTERVAL HOUR)))], joinType=[left])
+ :- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- IntervalJoin(joinType=[InnerJoin], windowBounds=[isRowTime=true, leftLowerBound=-600000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 600000:INTERVAL MINUTE)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
++- IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, rowtime])
: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -655,11 +700,11 @@ Calc(select=[a, b])
]]>
</Resource>
</TestCase>
- <TestCase name="testRowTimeLeftOuterJoin">
+ <TestCase name="testRowTimeRightOuterJoin">
<Resource name="sql">
<![CDATA[
SELECT t1.a, t2.b
-FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
+FROM MyTable t1 RIGHT OUTER JOIN MyTable2 t2 ON
t1.a = t2.a AND
t1.rowtime BETWEEN t2.rowtime - INTERVAL '10' SECOND AND t2.rowtime + INTERVAL '1' HOUR
]]>
@@ -667,7 +712,7 @@ FROM MyTable t1 LEFT OUTER JOIN MyTable2 t2 ON
<Resource name="ast">
<![CDATA[
LogicalProject(a=[$0], b=[$6])
-+- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 10000:INTERVAL SECOND)), <=($4, +($9, 3600000:INTERVAL HOUR)))], joinType=[left])
++- LogicalJoin(condition=[AND(=($0, $5), >=($4, -($9, 10000:INTERVAL SECOND)), <=($4, +($9, 3600000:INTERVAL HOUR)))], joinType=[right])
:- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
]]>
@@ -675,7 +720,7 @@ LogicalProject(a=[$0], b=[$6])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b])
-+- IntervalJoin(joinType=[LeftOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
++- IntervalJoin(joinType=[RightOuterJoin], windowBounds=[isRowTime=true, leftLowerBound=-10000, leftUpperBound=3600000, leftTimeIndex=1, rightTimeIndex=2], where=[((a = a0) AND (rowtime >= (rowtime0 - 10000:INTERVAL SECOND)) AND (rowtime <= (rowtime0 + 3600000:INTERVAL HOUR)))], select=[a, rowtime, a0, b, rowtime0])
:- Exchange(distribution=[hash[a]])
: +- Calc(select=[a, rowtime])
: +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
@@ -685,4 +730,49 @@ Calc(select=[a, b])
]]>
</Resource>
</TestCase>
+ <TestCase name="testSemiIntervalJoinWithSimpleConditionAndGroup">
+ <Resource name="sql">
+ <![CDATA[
+SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
+ SELECT t2.a FROM MyTable2 t2
+ WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime + INTERVAL '5' MINUTE
+ GROUP BY t2.a
+)
+ ]]>
+ </Resource>
+ <Resource name="ast">
+ <![CDATA[
+LogicalProject(a=[$0])
++- LogicalFilter(condition=[IN($0, {
+LogicalAggregate(group=[{0}])
+ LogicalProject(a=[$0])
+ LogicalFilter(condition=[AND(=($cor0.b, $1), >=($cor0.rowtime, $4), <=($cor0.rowtime, +($4, 300000:INTERVAL MINUTE)))])
+ LogicalTableScan(table=[[default_catalog, default_database, MyTable2]])
+})], variablesSet=[[$cor0]])
+ +- LogicalTableScan(table=[[default_catalog, default_database, MyTable]])
+]]>
+ </Resource>
+ <Resource name="optimized exec plan">
+ <![CDATA[
+Calc(select=[a])
++- Join(joinType=[InnerJoin], where=[((b = b0) AND (rowtime = rowtime0) AND (a = a0))], select=[a, b, rowtime, a0, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey])
+ :- Exchange(distribution=[hash[b, rowtime, a]])
+ : +- Calc(select=[a, b, CAST(rowtime) AS rowtime])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])(reuse_id=[1])
+ +- Exchange(distribution=[hash[b0, rowtime0, a]])
+ +- GroupAggregate(groupBy=[a, b0, rowtime0], select=[a, b0, rowtime0])
+ +- Exchange(distribution=[hash[a, b0, rowtime0]])
+ +- Calc(select=[a, b0, rowtime0])
+ +- Join(joinType=[InnerJoin], where=[((b0 = b) AND (rowtime0 >= rowtime) AND (rowtime0 <= (rowtime + 300000:INTERVAL MINUTE)))], select=[a, b, rowtime, b0, rowtime0], leftInputSpec=[NoUniqueKey], rightInputSpec=[HasUniqueKey])
+ :- Exchange(distribution=[hash[b]])
+ : +- Calc(select=[a, b, CAST(rowtime) AS rowtime])
+ : +- DataStreamScan(table=[[default_catalog, default_database, MyTable2]], fields=[a, b, c, proctime, rowtime])
+ +- Exchange(distribution=[hash[b]])
+ +- GroupAggregate(groupBy=[b, rowtime], select=[b, rowtime])
+ +- Exchange(distribution=[hash[b, rowtime]])
+ +- Calc(select=[b, CAST(rowtime) AS rowtime])
+ +- Reused(reference_id=[1])
+]]>
+ </Resource>
+ </TestCase>
</Root>
diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
index 7dc82f3..b57767b 100644
--- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
+++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/IntervalJoinTest.scala
@@ -493,6 +493,21 @@ class IntervalJoinTest extends TableTestBase {
">($2, $6)")
}
+ @Test
+ def testFallbackToRegularJoin(): Unit = {
+ // the following query would translated into regular join instead of interval join because the
+ // time attribute of right side would be materialized.
+ val sql =
+ """
+ |SELECT t1.a FROM MyTable t1 WHERE t1.a IN (
+ | SELECT t2.a FROM MyTable2 t2
+ | WHERE t1.b = t2.b AND t1.rowtime between t2.rowtime and t2.rowtime + INTERVAL '5' MINUTE
+ | GROUP BY t2.a
+ |)
+ """.stripMargin
+ util.verifyExecPlan(sql)
+ }
+
private def verifyTimeBoundary(
timeConditionSql: String,
expLeftSize: Long,
diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
index 66a5de2..9acfdd8 100644
--- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
+++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/join/window/WindowJoinOperator.java
@@ -299,10 +299,8 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
boolean matches = false;
for (RowData rightRecord : rightRecords) {
if (joinCondition.apply(leftRecord, rightRecord)) {
- if (joinCondition.apply(leftRecord, rightRecord)) {
- matches = true;
- break;
- }
+ matches = true;
+ break;
}
}
if (matches) {