You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by go...@apache.org on 2022/09/19 03:27:20 UTC

[flink] branch release-1.16 updated: [FLINK-29280][table-planner] Fix join hints could not be propagated in subquery

This is an automated email from the ASF dual-hosted git repository.

godfrey pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.16 by this push:
     new b37a8153f22 [FLINK-29280][table-planner] Fix join hints could not be propagated in subquery
b37a8153f22 is described below

commit b37a8153f22b62982ca144604a34056246f6f36c
Author: xuyang <xy...@163.com>
AuthorDate: Tue Sep 13 18:59:43 2022 +0800

    [FLINK-29280][table-planner] Fix join hints could not be propagated in subquery
    
    This closes #20823
    
    (cherry picked from commit 22cb554008320e6684280b5205f93d7a6f685c6c)
---
 .../apache/calcite/sql2rel/RelDecorrelator.java    |   4 +
 .../plan/rules/logical/SubQueryDecorrelator.java   |  22 +-
 .../table/planner/calcite/FlinkPlannerImpl.scala   |   3 +-
 .../rules/logical/FlinkSubQueryRemoveRule.scala    |  13 +-
 .../planner/plan/hints/batch/JoinHintTestBase.java |  40 +++
 .../plan/hints/batch/BroadcastJoinHintTest.xml     | 275 ++++++++++++++++----
 .../plan/hints/batch/NestLoopJoinHintTest.xml      | 275 ++++++++++++++++----
 .../plan/hints/batch/ShuffleHashJoinHintTest.xml   | 276 +++++++++++++++++----
 .../plan/hints/batch/ShuffleMergeJoinHintTest.xml  | 276 +++++++++++++++++----
 .../optimize/ClearQueryBlockAliasResolverTest.xml  | 152 +++++++++---
 .../planner/plan/optimize/JoinHintResolverTest.xml | 156 +++++++++---
 11 files changed, 1215 insertions(+), 277 deletions(-)

diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
index 0d132041278..cf2d4de2c65 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/calcite/sql2rel/RelDecorrelator.java
@@ -17,6 +17,7 @@
 package org.apache.calcite.sql2rel;
 
 import org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle;
+import org.apache.flink.table.planner.hint.FlinkHints;
 
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
@@ -207,6 +208,9 @@ public class RelDecorrelator implements ReflectiveVisitor {
 
         // ----- FLINK MODIFICATION BEGIN -----
 
+        // replace all join hints with upper case
+        newRootRel = FlinkHints.capitalizeJoinHints(newRootRel);
+
         // clear join hints which are propagated into wrong query block
         // The hint QueryBlockAlias will be added when building a RelNode tree before. It is used to
         // distinguish the query block in the SQL.
diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
index 828286b5fc6..6a27f45839e 100644
--- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
+++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/rules/logical/SubQueryDecorrelator.java
@@ -35,6 +35,7 @@ import org.apache.calcite.rel.core.Project;
 import org.apache.calcite.rel.core.SetOp;
 import org.apache.calcite.rel.core.Sort;
 import org.apache.calcite.rel.core.Values;
+import org.apache.calcite.rel.hint.Hintable;
 import org.apache.calcite.rel.logical.LogicalAggregate;
 import org.apache.calcite.rel.logical.LogicalCorrelate;
 import org.apache.calcite.rel.logical.LogicalFilter;
@@ -492,6 +493,7 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
                 }
             }
             RelNode newProject = RelOptUtil.createProject(newInput, projects, false);
+            newProject = ((LogicalProject) newProject).withHints(rel.getHints());
 
             final RexNode newCorCondition;
             if (frame.c != null) {
@@ -544,11 +546,13 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
 
             // Using LogicalFilter.create instead of RelBuilder.filter to create Filter
             // because RelBuilder.filter method does not have VariablesSet arg.
-            final LogicalFilter newFilter =
+            final RelNode newFilter =
                     LogicalFilter.create(
-                            frame.r,
-                            remainingCondition,
-                            com.google.common.collect.ImmutableSet.copyOf(rel.getVariablesSet()));
+                                    frame.r,
+                                    remainingCondition,
+                                    com.google.common.collect.ImmutableSet.copyOf(
+                                            rel.getVariablesSet()))
+                            .withHints(rel.getHints());
 
             // Adds input's correlation condition
             if (frame.c != null) {
@@ -705,7 +709,8 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
             }
 
             relBuilder.push(
-                    LogicalAggregate.create(newProject, false, newGroupSet, null, newAggCalls));
+                    LogicalAggregate.create(
+                            newProject, rel.getHints(), newGroupSet, null, newAggCalls));
 
             if (!omittedConstants.isEmpty()) {
                 final List<RexNode> postProjects = new ArrayList<>(relBuilder.fields());
@@ -876,7 +881,9 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
             RelCollation oldCollation = rel.getCollation();
             RelCollation newCollation = RexUtil.apply(mapping, oldCollation);
 
-            final Sort newSort = LogicalSort.create(newInput, newCollation, rel.offset, rel.fetch);
+            final RelNode newSort =
+                    LogicalSort.create(newInput, newCollation, rel.offset, rel.fetch)
+                            .withHints(rel.getHints());
 
             // Sort does not change input ordering
             return new Frame(rel, newSort, frame.c, frame.oldToNewOutputs);
@@ -917,6 +924,9 @@ public class SubQueryDecorrelator extends RelShuttleImpl {
                 if (!Util.equalShallow(oldInputs, newInputs)) {
                     newRel = rel.copy(rel.getTraitSet(), newInputs);
                 }
+                if (rel instanceof Hintable) {
+                    newRel = ((Hintable) newRel).withHints(((Hintable) rel).getHints());
+                }
             }
             // the output position should not change since there are no corVars coming from below.
             return new Frame(rel, newRel, null, identityMap(rel.getRowType().getFieldCount()));
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
index 07969d9e68d..84735f5f337 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/calcite/FlinkPlannerImpl.scala
@@ -46,6 +46,7 @@ import javax.annotation.Nullable
 
 import java.lang.{Boolean => JBoolean}
 import java.util
+import java.util.Locale
 import java.util.function.{Function => JFunction}
 
 import scala.collection.JavaConverters._
@@ -250,7 +251,7 @@ class FlinkPlannerImpl(
       JavaScalaConversionUtil.toScala(hints).foreach {
         case hint: SqlHint =>
           val hintName = hint.getName
-          if (JoinStrategy.isJoinStrategy(hintName)) {
+          if (JoinStrategy.isJoinStrategy(hintName.toUpperCase(Locale.ROOT))) {
             return true
           }
       }
diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala
index b87e3aea6a7..62607017313 100644
--- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala
+++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/rules/logical/FlinkSubQueryRemoveRule.scala
@@ -17,10 +17,12 @@
  */
 package org.apache.flink.table.planner.plan.rules.logical
 
+import org.apache.flink.table.planner.alias.ClearJoinHintWithInvalidPropagationShuttle
 import org.apache.flink.table.planner.calcite.{FlinkRelBuilder, FlinkRelFactories}
+import org.apache.flink.table.planner.hint.FlinkHints
 
 import com.google.common.collect.ImmutableList
-import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand}
+import org.apache.calcite.plan.{RelOptRule, RelOptRuleCall, RelOptRuleOperand, RelOptUtil}
 import org.apache.calcite.plan.RelOptRule._
 import org.apache.calcite.plan.RelOptUtil.Logic
 import org.apache.calcite.rel.{RelNode, RelShuttleImpl}
@@ -93,7 +95,14 @@ class FlinkSubQueryRemoveRule(
           relBuilder.filter(c)
         }
         relBuilder.project(fields(relBuilder, filter.getRowType.getFieldCount))
-        call.transformTo(relBuilder.build)
+        // the sub query has been replaced with a common node,
+        // so hints in it should also be resolved with the same logic in SqlToRelConverter
+        val newNode = relBuilder.build
+        val nodeWithHint = RelOptUtil.propagateRelHints(newNode, false)
+        val nodeWithCapitalizedJoinHints = FlinkHints.capitalizeJoinHints(nodeWithHint)
+        val finalNode =
+          nodeWithCapitalizedJoinHints.accept(new ClearJoinHintWithInvalidPropagationShuttle)
+        call.transformTo(finalNode)
       case _ => // do nothing
     }
   }
diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
index cf569d5e8b7..829581d4823 100644
--- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
+++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/hints/batch/JoinHintTestBase.java
@@ -829,6 +829,46 @@ public abstract class JoinHintTestBase extends TableTestBase {
         verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint())));
     }
 
+    @Test
+    public void testJoinHintWithJoinHintInSubQuery() {
+        String sql =
+                "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3)";
+
+        verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint())));
+    }
+
+    @Test
+    public void testJoinHintWithJoinHintInCorrelateAndWithFilter() {
+        String sql =
+                "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3 where T1.a1 = T2.a2)";
+
+        verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint())));
+    }
+
+    @Test
+    public void testJoinHintWithJoinHintInCorrelateAndWithProject() {
+        String sql =
+                "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ a2 + T1.a1 from T2 join T3 on T2.a2 = T3.a3)";
+
+        verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint())));
+    }
+
+    @Test
+    public void testJoinHintWithJoinHintInCorrelateAndWithAgg() {
+        String sql =
+                "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ count(T2.a2) from T2 join T1 on T2.a2 = T1.a1 group by T1.a1)";
+
+        verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint())));
+    }
+
+    @Test
+    public void testJoinHintWithJoinHintInCorrelateAndWithSortLimit() {
+        String sql =
+                "select * from T1 WHERE a1 IN (select /*+ %s(T2) */ T2.a2 from T2 join T1 on T2.a2 = T1.a1 order by T1.a1 limit 10)";
+
+        verifyRelPlanByCustom(String.format(sql, buildCaseSensitiveStr(getTestSingleJoinHint())));
+    }
+
     protected String buildAstPlanWithQueryBlockAlias(List<RelNode> relNodes) {
         StringBuilder astBuilder = new StringBuilder();
         relNodes.forEach(
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
index f93f67a017a..7ecc331564f 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/BroadcastJoinHintTest.xml
@@ -613,6 +613,200 @@ HashJoin(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], b
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithAgg">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ count(T2.a2) from T2 join T1 on T2.a2 = T1.a1 group by T1.a1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[$1])
+  LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
+    LogicalProject(a1=[$2], a2=[$0])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], select=[a1, b1], build=[right], tryDistinctBuildRow=[true])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[EXPR$0]])
+   +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1, Final_COUNT(count$0) AS EXPR$0])
+            +- Exchange(distribution=[hash[a1]])
+               +- LocalHashAggregate(groupBy=[a1], select=[a1, Partial_COUNT(a2) AS count$0])
+                  +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], isBroadcast=[true], build=[right])
+                     :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                     +- Exchange(distribution=[broadcast])
+                        +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithFilter">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3 where T1.a1 = T2.a2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalFilter(condition=[=($cor0.a1, $0)])
+    LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+      LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[a2]])
+   +- Calc(select=[a2])
+      +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[right])
+         :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+         +- Exchange(distribution=[broadcast])
+            +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithProject">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + T1.a1 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, EXPR$0)], select=[a1, b1, EXPR$0], build=[left])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
+   +- Exchange(distribution=[hash[EXPR$0]])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+            +- Exchange(distribution=[hash[EXPR$0, a1]])
+               +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+                  +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
+                     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, a1), a1)], select=[a2, a1], build=[right])
+                        :- Calc(select=[a2])
+                        :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[right])
+                        :     :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                        :     +- Exchange(distribution=[broadcast])
+                        :        +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+                        +- Exchange(distribution=[broadcast])
+                           +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1])
+                              +- Exchange(distribution=[hash[a1]])
+                                 +- LocalHashAggregate(groupBy=[a1], select=[a1])
+                                    +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithoutJoinPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1, T2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
++- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[T1]]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, b1, a2, b2], build=[left])
+:- Exchange(distribution=[broadcast])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithSortLimit">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ T2.a2 from T2 join T1 on T2.a2 = T1.a1 order by T1.a1 limit 10)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
+    LogicalProject(a2=[$0], a1=[$2])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], isBroadcast=[true], build=[right])
+:- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[a2])
+      +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true])
+         +- Exchange(distribution=[single])
+            +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[false])
+               +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], isBroadcast=[true], build=[right])
+                  :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                  +- Exchange(distribution=[broadcast])
+                     +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[a2]])
+   +- Calc(select=[a2])
+      +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], isBroadcast=[true], build=[right])
+         :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+         +- Exchange(distribution=[broadcast])
+            +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -836,27 +1030,6 @@ HashJoin(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2, a3, b3
 :     +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
 +- Exchange(distribution=[broadcast])
    +- TableSourceScan(table=[[default_catalog, default_database, T3]], fields=[a3, b3])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithNonEquiPred">
-    <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[T1]]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[>(a1, a2)], select=[a1, b1, a2, b2], build=[left])
-:- Exchange(distribution=[broadcast])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
-+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
 ]]>
     </Resource>
   </TestCase>
@@ -1159,8 +1332,8 @@ Calc(select=[a1, CAST('abc' AS VARCHAR(2147483647)) AS b1])
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
 +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[T1]]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -1172,21 +1345,45 @@ HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], isBro
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinHintWithoutJoinPred">
+  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
     <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1, T2]]>
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1 where a1 in (select a2 from T2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[a2]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithNonEquiPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[BROADCAST inheritPath:[0] options:[T1]]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, b1, a2, b2], build=[left])
+NestedLoopJoin(joinType=[InnerJoin], where=[>(a1, a2)], select=[a1, b1, a2, b2], build=[left])
 :- Exchange(distribution=[broadcast])
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
@@ -1233,30 +1430,6 @@ HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2],
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
-    <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1 where a1 in (select a2 from T2)]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1])
-+- LogicalFilter(condition=[IN($0, {
-LogicalProject(a2=[$0])
-  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-})])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
-:- Exchange(distribution=[hash[a1]])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
-+- Exchange(distribution=[hash[a2]])
-   +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
index e7ff453d232..f771fa2f933 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/NestLoopJoinHintTest.xml
@@ -612,6 +612,200 @@ NestedLoopJoin(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, a2,
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- Exchange(distribution=[single])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithAgg">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ NeSt_lOoP(T2) */ count(T2.a2) from T2 join T1 on T2.a2 = T1.a1 group by T1.a1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[$1])
+  LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
+    LogicalProject(a1=[$2], a2=[$0])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], select=[a1, b1], build=[right], tryDistinctBuildRow=[true])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[EXPR$0]])
+   +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1, Final_COUNT(count$0) AS EXPR$0])
+            +- Exchange(distribution=[hash[a1]])
+               +- LocalHashAggregate(groupBy=[a1], select=[a1, Partial_COUNT(a2) AS count$0])
+                  +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[right])
+                     :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                     +- Exchange(distribution=[broadcast])
+                        +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithFilter">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ NeSt_lOoP(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3 where T1.a1 = T2.a2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalFilter(condition=[=($cor0.a1, $0)])
+    LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+      LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[a2]])
+   +- Calc(select=[a2])
+      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right])
+         :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+         +- Exchange(distribution=[broadcast])
+            +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithProject">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ NeSt_lOoP(T2) */ a2 + T1.a1 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, EXPR$0)], select=[a1, b1, EXPR$0], build=[left])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
+   +- Exchange(distribution=[hash[EXPR$0]])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+            +- Exchange(distribution=[hash[EXPR$0, a1]])
+               +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+                  +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
+                     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, a1), a1)], select=[a2, a1], build=[right])
+                        :- Calc(select=[a2])
+                        :  +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right])
+                        :     :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                        :     +- Exchange(distribution=[broadcast])
+                        :        +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+                        +- Exchange(distribution=[broadcast])
+                           +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1])
+                              +- Exchange(distribution=[hash[a1]])
+                                 +- LocalHashAggregate(groupBy=[a1], select=[a1])
+                                    +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithoutJoinPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ NEST_LOOP(T1) */* from T1, T2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
++- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[NEST_LOOP inheritPath:[0] options:[T1]]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, b1, a2, b2], build=[left])
+:- Exchange(distribution=[broadcast])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithSortLimit">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ NeSt_lOoP(T2) */ T2.a2 from T2 join T1 on T2.a2 = T1.a1 order by T1.a1 limit 10)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
+    LogicalProject(a2=[$0], a1=[$2])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], isBroadcast=[true], build=[right])
+:- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[a2])
+      +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true])
+         +- Exchange(distribution=[single])
+            +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[false])
+               +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[right])
+                  :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                  +- Exchange(distribution=[broadcast])
+                     +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ NeSt_lOoP(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[a2]])
+   +- Calc(select=[a2])
+      +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right])
+         :- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+         +- Exchange(distribution=[broadcast])
+            +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -833,27 +1027,6 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2,
 :     +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
 +- Exchange(distribution=[broadcast])
    +- TableSourceScan(table=[[default_catalog, default_database, T3]], fields=[a3, b3])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithNonEquiPred">
-    <Resource name="sql">
-      <![CDATA[select /*+ NEST_LOOP(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[NEST_LOOP inheritPath:[0] options:[T1]]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[>(a1, a2)], select=[a1, b1, a2, b2], build=[left])
-:- Exchange(distribution=[broadcast])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
-+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
 ]]>
     </Resource>
   </TestCase>
@@ -1156,8 +1329,8 @@ Calc(select=[a1, CAST('abc' AS VARCHAR(2147483647)) AS b1])
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
 +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[NEST_LOOP inheritPath:[0] options:[T1]]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -1169,21 +1342,45 @@ NestedLoopJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2],
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinHintWithoutJoinPred">
+  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
     <Resource name="sql">
-      <![CDATA[select /*+ NEST_LOOP(T1) */* from T1, T2]]>
+      <![CDATA[select /*+ NEST_LOOP(T1) */* from T1 where a1 in (select a2 from T2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[a2]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithNonEquiPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ NEST_LOOP(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[NEST_LOOP inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[NEST_LOOP inheritPath:[0] options:[T1]]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, b1, a2, b2], build=[left])
+NestedLoopJoin(joinType=[InnerJoin], where=[>(a1, a2)], select=[a1, b1, a2, b2], build=[left])
 :- Exchange(distribution=[broadcast])
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
@@ -1229,30 +1426,6 @@ NestedLoopJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, a2,
 :- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- Exchange(distribution=[broadcast])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
-    <Resource name="sql">
-      <![CDATA[select /*+ NEST_LOOP(T1) */* from T1 where a1 in (select a2 from T2)]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1])
-+- LogicalFilter(condition=[IN($0, {
-LogicalProject(a2=[$0])
-  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-})])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
-:- Exchange(distribution=[hash[a1]])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
-+- Exchange(distribution=[hash[a2]])
-   +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
index 0d295149aca..00fbfb8ac69 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleHashJoinHintTest.xml
@@ -633,6 +633,201 @@ HashJoin(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], b
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithAgg">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_HaSh(T2) */ count(T2.a2) from T2 join T1 on T2.a2 = T1.a1 group by T1.a1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[$1])
+  LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
+    LogicalProject(a1=[$2], a2=[$0])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], select=[a1, b1], build=[right], tryDistinctBuildRow=[true])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[EXPR$0]])
+   +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[false], groupBy=[a1], select=[a1, COUNT(a2) AS EXPR$0])
+            +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[right])
+               :- Exchange(distribution=[hash[a2]])
+               :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+               +- Exchange(distribution=[hash[a1]])
+                  +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithFilter">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_HaSh(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3 where T1.a1 = T2.a2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalFilter(condition=[=($cor0.a1, $0)])
+    LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+      LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Calc(select=[a2])
+   +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right])
+      :- Exchange(distribution=[hash[a2]])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+      +- Exchange(distribution=[hash[a3]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithProject">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_HaSh(T2) */ a2 + T1.a1 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, EXPR$0)], select=[a1, b1, EXPR$0], build=[left])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
+   +- Exchange(distribution=[hash[EXPR$0]])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+            +- Exchange(distribution=[hash[EXPR$0, a1]])
+               +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+                  +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
+                     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, a1), a1)], select=[a2, a1], build=[right])
+                        :- Calc(select=[a2])
+                        :  +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right])
+                        :     :- Exchange(distribution=[hash[a2]])
+                        :     :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                        :     +- Exchange(distribution=[hash[a3]])
+                        :        +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+                        +- Exchange(distribution=[broadcast])
+                           +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1])
+                              +- Exchange(distribution=[hash[a1]])
+                                 +- LocalHashAggregate(groupBy=[a1], select=[a1])
+                                    +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithoutJoinPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ SHUFFLE_HASH(T1) */* from T1, T2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
++- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, b1, a2, b2], build=[left])
+:- Exchange(distribution=[broadcast])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithSortLimit">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_HaSh(T2) */ T2.a2 from T2 join T1 on T2.a2 = T1.a1 order by T1.a1 limit 10)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
+    LogicalProject(a2=[$0], a1=[$2])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], isBroadcast=[true], build=[right])
+:- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[a2])
+      +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true])
+         +- Exchange(distribution=[single])
+            +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[false])
+               +- HashJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1], build=[right])
+                  :- Exchange(distribution=[hash[a2]])
+                  :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                  +- Exchange(distribution=[hash[a1]])
+                     +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_HaSh(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Calc(select=[a2])
+   +- HashJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3], build=[right])
+      :- Exchange(distribution=[hash[a2]])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+      +- Exchange(distribution=[hash[a3]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -866,27 +1061,6 @@ HashJoin(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2, a3, b3
 :        +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
 +- Exchange(distribution=[hash[b3]])
    +- TableSourceScan(table=[[default_catalog, default_database, T3]], fields=[a3, b3])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithNonEquiPred">
-    <Resource name="sql">
-      <![CDATA[select /*+ SHUFFLE_HASH(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[>(a1, a2)], select=[a1, b1, a2, b2], build=[left])
-:- Exchange(distribution=[broadcast])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
-+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
 ]]>
     </Resource>
   </TestCase>
@@ -1189,8 +1363,8 @@ Calc(select=[a1, CAST('abc' AS VARCHAR(2147483647)) AS b1])
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
 +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -1203,21 +1377,45 @@ HashJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2], build
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinHintWithoutJoinPred">
+  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
     <Resource name="sql">
-      <![CDATA[select /*+ SHUFFLE_HASH(T1) */* from T1, T2]]>
+      <![CDATA[select /*+ SHUFFLE_HASH(T1) */* from T1 where a1 in (select a2 from T2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[a2]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithNonEquiPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ SHUFFLE_HASH(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[SHUFFLE_HASH inheritPath:[0] options:[T1]]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, b1, a2, b2], build=[left])
+NestedLoopJoin(joinType=[InnerJoin], where=[>(a1, a2)], select=[a1, b1, a2, b2], build=[left])
 :- Exchange(distribution=[broadcast])
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
@@ -1265,30 +1463,6 @@ HashJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2],
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
-    <Resource name="sql">
-      <![CDATA[select /*+ SHUFFLE_HASH(T1) */* from T1 where a1 in (select a2 from T2)]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1])
-+- LogicalFilter(condition=[IN($0, {
-LogicalProject(a2=[$0])
-  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-})])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
-:- Exchange(distribution=[hash[a1]])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
-+- Exchange(distribution=[hash[a2]])
-   +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
index c101faa4e50..9600cb51a53 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/hints/batch/ShuffleMergeJoinHintTest.xml
@@ -633,6 +633,201 @@ SortMergeJoin(joinType=[FullOuterJoin], where=[=(a1, a2)], select=[a1, b1, a2, b
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithAgg">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_MeRgE(T2) */ count(T2.a2) from T2 join T1 on T2.a2 = T1.a1 group by T1.a1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[$1])
+  LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
+    LogicalProject(a1=[$2], a2=[$0])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, EXPR$0)], select=[a1, b1], build=[right], tryDistinctBuildRow=[true])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[EXPR$0]])
+   +- LocalHashAggregate(groupBy=[EXPR$0], select=[EXPR$0])
+      +- Calc(select=[EXPR$0])
+         +- SortAggregate(isMerge=[false], groupBy=[a1], select=[a1, COUNT(a2) AS EXPR$0])
+            +- SortMergeJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1])
+               :- Exchange(distribution=[hash[a2]])
+               :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+               +- Exchange(distribution=[hash[a1]])
+                  +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithFilter">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_MeRgE(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3 where T1.a1 = T2.a2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalFilter(condition=[=($cor0.a1, $0)])
+    LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+      LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Calc(select=[a2])
+   +- SortMergeJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3])
+      :- Exchange(distribution=[hash[a2]])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+      +- Exchange(distribution=[hash[a3]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithProject">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_MeRgE(T2) */ a2 + T1.a1 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+Calc(select=[a1, b1])
++- HashJoin(joinType=[InnerJoin], where=[=(a1, EXPR$0)], select=[a1, b1, EXPR$0], build=[left])
+   :- Exchange(distribution=[hash[a1]])
+   :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
+   +- Exchange(distribution=[hash[EXPR$0]])
+      +- Calc(select=[EXPR$0])
+         +- HashAggregate(isMerge=[true], groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+            +- Exchange(distribution=[hash[EXPR$0, a1]])
+               +- LocalHashAggregate(groupBy=[EXPR$0, a1], select=[EXPR$0, a1])
+                  +- Calc(select=[+(a2, a1) AS EXPR$0, a1])
+                     +- NestedLoopJoin(joinType=[InnerJoin], where=[=(+(a2, a1), a1)], select=[a2, a1], build=[right])
+                        :- Calc(select=[a2])
+                        :  +- SortMergeJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3])
+                        :     :- Exchange(distribution=[hash[a2]])
+                        :     :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                        :     +- Exchange(distribution=[hash[a3]])
+                        :        +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
+                        +- Exchange(distribution=[broadcast])
+                           +- HashAggregate(isMerge=[true], groupBy=[a1], select=[a1])
+                              +- Exchange(distribution=[hash[a1]])
+                                 +- LocalHashAggregate(groupBy=[a1], select=[a1])
+                                    +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithoutJoinPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ SHUFFLE_MERGE(T1) */* from T1, T2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
++- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]]]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, b1, a2, b2], build=[left])
+:- Exchange(distribution=[broadcast])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithSortLimit">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_MeRgE(T2) */ T2.a2 from T2 join T1 on T2.a2 = T1.a1 order by T1.a1 limit 10)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
+    LogicalProject(a2=[$0], a1=[$2])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], isBroadcast=[true], build=[right])
+:- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[broadcast])
+   +- Calc(select=[a2])
+      +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[true])
+         +- Exchange(distribution=[single])
+            +- SortLimit(orderBy=[a1 ASC], offset=[0], fetch=[10], global=[false])
+               +- SortMergeJoin(joinType=[InnerJoin], where=[=(a2, a1)], select=[a2, a1])
+                  :- Exchange(distribution=[hash[a2]])
+                  :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+                  +- Exchange(distribution=[hash[a1]])
+                     +- TableSourceScan(table=[[default_catalog, default_database, T1, project=[a1], metadata=[]]], fields=[a1], hints=[[[ALIAS options:[T1]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ ShUfFlE_MeRgE(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Calc(select=[a2])
+   +- SortMergeJoin(joinType=[InnerJoin], where=[=(a2, a3)], select=[a2, a3])
+      :- Exchange(distribution=[hash[a2]])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+      +- Exchange(distribution=[hash[a3]])
+         +- TableSourceScan(table=[[default_catalog, default_database, T3, project=[a3], metadata=[]]], fields=[a3], hints=[[[ALIAS options:[T3]]]])
 ]]>
     </Resource>
   </TestCase>
@@ -866,27 +1061,6 @@ SortMergeJoin(joinType=[InnerJoin], where=[=(b2, b3)], select=[a1, b1, a2, b2, a
 :        +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
 +- Exchange(distribution=[hash[b3]])
    +- TableSourceScan(table=[[default_catalog, default_database, T3]], fields=[a3, b3])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithNonEquiPred">
-    <Resource name="sql">
-      <![CDATA[select /*+ SHUFFLE_MERGE(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[>(a1, a2)], select=[a1, b1, a2, b2], build=[left])
-:- Exchange(distribution=[broadcast])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
-+- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
 ]]>
     </Resource>
   </TestCase>
@@ -1189,8 +1363,8 @@ Calc(select=[a1, CAST('abc' AS VARCHAR(2147483647)) AS b1])
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
 +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]]]])
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]])
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
@@ -1203,21 +1377,45 @@ SortMergeJoin(joinType=[InnerJoin], where=[=(a1, a2)], select=[a1, b1, a2, b2])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinHintWithoutJoinPred">
+  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
     <Resource name="sql">
-      <![CDATA[select /*+ SHUFFLE_MERGE(T1) */* from T1, T2]]>
+      <![CDATA[select /*+ SHUFFLE_MERGE(T1) */* from T1 where a1 in (select a2 from T2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1])
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+})])
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+]]>
+    </Resource>
+    <Resource name="optimized rel plan">
+      <![CDATA[
+HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
+:- Exchange(distribution=[hash[a1]])
+:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
++- Exchange(distribution=[hash[a2]])
+   +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithNonEquiPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ SHUFFLE_MERGE(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3])
-+- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]]]])
++- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[SHUFFLE_MERGE inheritPath:[0] options:[T1]]]])
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-NestedLoopJoin(joinType=[InnerJoin], where=[true], select=[a1, b1, a2, b2], build=[left])
+NestedLoopJoin(joinType=[InnerJoin], where=[>(a1, a2)], select=[a1, b1, a2, b2], build=[left])
 :- Exchange(distribution=[broadcast])
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
@@ -1265,30 +1463,6 @@ SortMergeJoin(joinType=[RightOuterJoin], where=[=(a1, a2)], select=[a1, b1, a2,
 :  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
 +- Exchange(distribution=[hash[a2]])
    +- TableSourceScan(table=[[default_catalog, default_database, T2]], fields=[a2, b2])
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
-    <Resource name="sql">
-      <![CDATA[select /*+ SHUFFLE_MERGE(T1) */* from T1 where a1 in (select a2 from T2)]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1])
-+- LogicalFilter(condition=[IN($0, {
-LogicalProject(a2=[$0])
-  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-})])
-   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
-]]>
-    </Resource>
-    <Resource name="optimized rel plan">
-      <![CDATA[
-HashJoin(joinType=[LeftSemiJoin], where=[=(a1, a2)], select=[a1, b1], build=[right])
-:- Exchange(distribution=[hash[a1]])
-:  +- TableSourceScan(table=[[default_catalog, default_database, T1]], fields=[a1, b1])
-+- Exchange(distribution=[hash[a2]])
-   +- TableSourceScan(table=[[default_catalog, default_database, T2, project=[a2], metadata=[]]], fields=[a2], hints=[[[ALIAS options:[T2]]]])
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
index 7dea9c92062..cedbacf095e 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/ClearQueryBlockAliasResolverTest.xml
@@ -343,6 +343,109 @@ LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a
 +- LogicalJoin(condition=[=($0, $2)], joinType=[full], joinHints=[[[BROADCAST options:[RIGHT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithAgg">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ count(T2.a2) from T2 join T1 on T2.a2 = T1.a1 group by T1.a1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[$1])
+  LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
+    LogicalProject(a1=[$2], a2=[$0])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithFilter">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3 where T1.a1 = T2.a2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalFilter(condition=[=($cor0.a1, $0)])
+    LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+      LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithProject">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + T1.a1 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithoutJoinPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1, T2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
++- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithSortLimit">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ T2.a2 from T2 join T1 on T2.a2 = T1.a1 order by T1.a1 limit 10)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
+    LogicalProject(a2=[$0], a1=[$2])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
 ]]>
     </Resource>
   </TestCase>
@@ -475,19 +578,6 @@ LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], a3=[$4], b3=[$5]), rowType=[R
       :  :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
       :  +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
       +- LogicalTableScan(table=[[default_catalog, default_database, T3]]), rowType=[RecordType(BIGINT a3, VARCHAR(2147483647) b3)]
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithNonEquiPred">
-    <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-+- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
 ]]>
     </Resource>
   </TestCase>
@@ -658,14 +748,29 @@ LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinHintWithoutJoinPred">
+  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
     <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1, T2]]>
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1 where a1 in (select a2 from T2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithNonEquiPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-+- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
++- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
 ]]>
@@ -694,21 +799,6 @@ LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a
 +- LogicalJoin(condition=[=($0, $2)], joinType=[right], joinHints=[[[BROADCAST options:[RIGHT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
-    <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1 where a1 in (select a2 from T2)]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
-+- LogicalFilter(condition=[IN($0, {
-LogicalProject(a2=[$0])
-  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
-   +- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
 ]]>
     </Resource>
   </TestCase>
diff --git a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml
index 3368880598e..51a413d7ea8 100644
--- a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml
+++ b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/optimize/JoinHintResolverTest.xml
@@ -343,6 +343,109 @@ LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a
 +- LogicalJoin(condition=[=($0, $2)], joinType=[full], joinHints=[[[BROADCAST options:[RIGHT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithAgg">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ count(T2.a2) from T2 join T1 on T2.a2 = T1.a1 group by T1.a1)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[$1])
+  LogicalAggregate(group=[{0}], EXPR$0=[COUNT($1)])
+    LogicalProject(a1=[$2], a2=[$0])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithFilter">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3 where T1.a1 = T2.a2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalFilter(condition=[=($cor0.a1, $0)])
+    LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+      LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+      LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithProject">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 + T1.a1 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(EXPR$0=[+($0, $cor0.a1)])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})], variablesSet=[[$cor0]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithoutJoinPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1, T2]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
++- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInCorrelateAndWithSortLimit">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ T2.a2 from T2 join T1 on T2.a2 = T1.a1 order by T1.a1 limit 10)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalSort(sort0=[$1], dir0=[ASC-nulls-first], fetch=[10])
+    LogicalProject(a2=[$0], a1=[$2])
+      LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+        LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+        LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]])
+})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithJoinHintInSubQuery">
+    <Resource name="sql">
+      <![CDATA[select * from T1 WHERE a1 IN (select /*+ BrOaDcAsT(T2) */ a2 from T2 join T3 on T2.a2 = T3.a3)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalJoin(condition=[=($0, $2)], joinType=[inner])
+    LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+    LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]])
+})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
 ]]>
     </Resource>
   </TestCase>
@@ -475,19 +578,6 @@ LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3], a3=[$4], b3=[$5]), rowType=[R
       :  :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
       :  +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
       +- LogicalTableScan(table=[[default_catalog, default_database, T3]], hints=[[[ALIAS inheritPath:[] options:[T3]]]]), rowType=[RecordType(BIGINT a3, VARCHAR(2147483647) b3)]
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithNonEquiPred">
-    <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-+- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
 ]]>
     </Resource>
   </TestCase>
@@ -653,19 +743,34 @@ LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
 +- LogicalJoin(condition=[=($0, $2)], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-   :- LogicalTableScan(table=[[default_catalog, default_database, T1]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
-   +- LogicalTableScan(table=[[default_catalog, default_database, T2]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
+   :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testJoinHintWithoutJoinPred">
+  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
     <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1, T2]]>
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1 where a1 in (select a2 from T2)]]>
+    </Resource>
+    <Resource name="ast">
+      <![CDATA[
+LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
++- LogicalFilter(condition=[IN($0, {
+LogicalProject(a2=[$0])
+  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
+})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
+]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testJoinHintWithNonEquiPred">
+    <Resource name="sql">
+      <![CDATA[select /*+ BROADCAST(T1) */* from T1 inner join T2 on T1.a1 > T2.a2]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
-+- LogicalJoin(condition=[true], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
++- LogicalJoin(condition=[>($0, $2)], joinType=[inner], joinHints=[[[BROADCAST options:[LEFT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
 ]]>
@@ -694,21 +799,6 @@ LogicalProject(a1=[$0], b1=[$1], a2=[$2], b2=[$3]), rowType=[RecordType(BIGINT a
 +- LogicalJoin(condition=[=($0, $2)], joinType=[right], joinHints=[[[BROADCAST options:[RIGHT]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1, BIGINT a2, VARCHAR(2147483647) b2)]
    :- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
    +- LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]]), rowType=[RecordType(BIGINT a2, VARCHAR(2147483647) b2)]
-]]>
-    </Resource>
-  </TestCase>
-  <TestCase name="testJoinHintWithSemiJoinAndLeftSideAsBuildSide">
-    <Resource name="sql">
-      <![CDATA[select /*+ BROADCAST(T1) */* from T1 where a1 in (select a2 from T2)]]>
-    </Resource>
-    <Resource name="ast">
-      <![CDATA[
-LogicalProject(a1=[$0], b1=[$1]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
-+- LogicalFilter(condition=[IN($0, {
-LogicalProject(a2=[$0])
-  LogicalTableScan(table=[[default_catalog, default_database, T2]], hints=[[[ALIAS inheritPath:[] options:[T2]]]])
-})]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
-   +- LogicalTableScan(table=[[default_catalog, default_database, T1]], hints=[[[ALIAS inheritPath:[] options:[T1]]]]), rowType=[RecordType(BIGINT a1, VARCHAR(2147483647) b1)]
 ]]>
     </Resource>
   </TestCase>